001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.TBaseType; 005import gudusoft.gsqlparser.TGSqlParser; 006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener; 007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader; 008import gudusoft.gsqlparser.dlineage.dataflow.model.*; 009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate; 010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*; 011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser; 012import gudusoft.gsqlparser.dlineage.util.*; 013import gudusoft.gsqlparser.sqlenv.TSQLEnv; 014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 015import gudusoft.gsqlparser.util.IndexedLinkedHashMap; 016import gudusoft.gsqlparser.util.Logger; 017import gudusoft.gsqlparser.util.LoggerFactory; 018import gudusoft.gsqlparser.util.SQLUtil; 019import gudusoft.gsqlparser.util.json.JSON; 020 021import java.io.File; 022import java.util.*; 023import java.util.concurrent.*; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.stream.Collectors; 026 027public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer { 028 private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class); 029 private SqlInfo[] sqlInfos; 030 private Option option = new Option(); 031 private TSQLEnv sqlenv = null; 032 private dataflow dataflow; 033 private String dataflowString; 034 private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>(); 035 private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>(); 036 private final Map<String, String> hashSQLMap = new HashMap(); 037 038 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) { 039 this.sqlInfos = sqlInfos; 040 option.setVendor(dbVendor); 041 option.setSimpleOutput(simpleOutput); 042 ModelBindingManager.setGlobalOption(option); 043 } 044 045 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) { 046 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 047 for (int i = 0; i < sqlContents.length; i++) { 048 SqlInfo info = new SqlInfo(); 049 info.setSql(sqlContents[i]); 050 info.setOriginIndex(0); 051 sqlInfos[i] = info; 052 } 053 option.setVendor(dbVendor); 054 option.setSimpleOutput(simpleOutput); 055 option.setDefaultServer(defaultServer); 056 option.setDefaultDatabase(defaultDatabase); 057 option.setDefaultSchema(defaltSchema); 058 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 059 } 060 061 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) { 062 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 063 for (int i = 0; i < sqlContents.length; i++) { 064 SqlInfo info = new SqlInfo(); 065 info.setSql(sqlContents[i]); 066 info.setOriginIndex(0); 067 sqlInfos[i] = info; 068 } 069 option.setVendor(dbVendor); 070 option.setSimpleOutput(simpleOutput); 071 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 072 } 073 074 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) { 075 this.sqlInfos = sqlInfos; 076 this.option = option; 077 ModelBindingManager.setGlobalOption(option); 078 } 079 080 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) { 081 SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length]; 082 for (int i = 0; i < sqlFiles.length; i++) { 083 SqlInfo info = new SqlInfo(); 084 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 085 info.setFileName(sqlFiles[i].getName()); 086 info.setFilePath(sqlFiles[i].getAbsolutePath()); 087 info.setOriginIndex(0); 088 sqlInfos[i] = info; 089 } 090 this.sqlInfos = sqlInfos; 091 this.option = option; 092 ModelBindingManager.setGlobalOption(option); 093 } 094 095 protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) { 096 List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>(); 097 List sqlContents = (List) JSON.parseObject(json); 098 for (int j = 0; j < sqlContents.size(); j++) { 099 Map sqlContent = (Map) sqlContents.get(j); 100 String sql = (String) sqlContent.get("sql"); 101 String fileName = (String) sqlContent.get("fileName"); 102 String filePath = (String) sqlContent.get("filePath"); 103 if (sql != null && sql.trim().startsWith("{")) { 104 if (sql.indexOf("createdBy") != -1) { 105 if (this.sqlenv == null) { 106 TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql); 107 if (sqlenvs != null) { 108 this.sqlenv = sqlenvs[0]; 109 } 110 } 111 if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) { 112 Map queryObject = (Map) JSON.parseObject(sql); 113 List querys = (List) queryObject.get("queries"); 114 if (querys != null) { 115 for (int i = 0; i < querys.size(); i++) { 116 Map object = (Map) querys.get(i); 117 SqlInfo info = new SqlInfo(); 118 info.setSql(JSON.toJSONString(object)); 119 info.setFileName(fileName); 120 info.setFilePath(filePath); 121 info.setOriginIndex(i); 122 sqlInfos.add(info); 123 } 124 queryObject.remove("queries"); 125 SqlInfo info = new SqlInfo(); 126 info.setSql(JSON.toJSONString(queryObject)); 127 info.setFileName(fileName); 128 info.setFilePath(filePath); 129 info.setOriginIndex(querys.size()); 130 sqlInfos.add(info); 131 } else { 132 SqlInfo info = new SqlInfo(); 133 info.setSql(JSON.toJSONString(queryObject)); 134 info.setFileName(fileName); 135 info.setFilePath(filePath); 136 info.setOriginIndex(0); 137 sqlInfos.add(info); 138 } 139 } 140 else if (sql.toLowerCase().indexOf("sqlflow") != -1) { 141 Map sqlflow = (Map) JSON.parseObject(sql); 142 List<Map> servers = (List<Map>) sqlflow.get("servers"); 143 if (servers != null) { 144 for (Map queryObject : servers) { 145 String name = (String) queryObject.get("name"); 146 String dbVendor = (String) queryObject.get("dbVendor"); 147 List querys = (List) queryObject.get("queries"); 148 if (querys != null) { 149 for (int i = 0; i < querys.size(); i++) { 150 Map object = (Map) querys.get(i); 151 SqlInfo info = new SqlInfo(); 152 info.setSql(JSON.toJSONString(object)); 153 info.setFileName(fileName); 154 info.setFilePath(filePath); 155 info.setOriginIndex(i); 156 info.setDbVendor(dbVendor); 157 info.setServer(name); 158 sqlInfos.add(info); 159 } 160 queryObject.remove("queries"); 161 SqlInfo info = new SqlInfo(); 162 info.setSql(JSON.toJSONString(queryObject)); 163 info.setFileName(fileName); 164 info.setFilePath(filePath); 165 info.setOriginIndex(querys.size()); 166 info.setDbVendor(dbVendor); 167 info.setServer(filePath); 168 sqlInfos.add(info); 169 } else { 170 SqlInfo info = new SqlInfo(); 171 info.setSql(JSON.toJSONString(queryObject)); 172 info.setFileName(fileName); 173 info.setFilePath(filePath); 174 info.setOriginIndex(0); 175 sqlInfos.add(info); 176 } 177 } 178 } 179 } 180 } 181 } else if (sql != null) { 182 SqlInfo info = new SqlInfo(); 183 info.setSql(sql); 184 info.setFileName(fileName); 185 info.setFilePath(filePath); 186 info.setOriginIndex(0); 187 sqlInfos.add(info); 188 } 189 } 190 return sqlInfos; 191 } 192 193 @Override 194 public boolean isIgnoreRecordSet() { 195 return option.isIgnoreRecordSet(); 196 } 197 198 @Override 199 public void setIgnoreRecordSet(boolean ignoreRecordSet) { 200 option.setIgnoreRecordSet(ignoreRecordSet); 201 } 202 203 @Override 204 public boolean isSimpleShowTopSelectResultSet() { 205 return option.isSimpleShowTopSelectResultSet(); 206 } 207 208 @Override 209 public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) { 210 option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet); 211 } 212 213 @Override 214 public boolean isSimpleShowFunction() { 215 return option.isSimpleShowFunction(); 216 } 217 218 @Override 219 public void setSimpleShowFunction(boolean simpleShowFunction) { 220 option.setSimpleShowFunction(simpleShowFunction); 221 } 222 223 @Override 224 public boolean isShowJoin() { 225 return option.isShowJoin(); 226 } 227 228 @Override 229 public void setShowJoin(boolean showJoin) { 230 option.setShowJoin(showJoin); 231 } 232 233 @Override 234 public boolean isShowImplicitSchema() { 235 return option.isShowImplicitSchema(); 236 } 237 238 @Override 239 public void setShowImplicitSchema(boolean showImplicitSchema) { 240 option.setShowImplicitSchema(showImplicitSchema); 241 } 242 243 @Override 244 public boolean isShowConstantTable() { 245 return option.isShowConstantTable(); 246 } 247 248 @Override 249 public void setShowConstantTable(boolean showConstantTable) { 250 option.setShowConstantTable(showConstantTable); 251 } 252 253 @Override 254 public boolean isShowCountTableColumn() { 255 return option.isShowCountTableColumn(); 256 } 257 258 @Override 259 public void setShowCountTableColumn(boolean showCountTableColumn) { 260 option.setShowCountTableColumn(showCountTableColumn); 261 } 262 263 @Override 264 public boolean isTransform() { 265 return option.isTransform(); 266 } 267 268 @Override 269 public void setTransform(boolean transform) { 270 option.setTransform(transform); 271 if (option.isTransformCoordinate()) { 272 option.setTransform(true); 273 } 274 } 275 276 @Override 277 public boolean isTransformCoordinate() { 278 return option.isTransformCoordinate(); 279 } 280 281 @Override 282 public void setTransformCoordinate(boolean transformCoordinate) { 283 option.setTransformCoordinate(transformCoordinate); 284 if (transformCoordinate) { 285 option.setTransform(true); 286 } 287 } 288 289 @Override 290 public boolean isLinkOrphanColumnToFirstTable() { 291 return option.isLinkOrphanColumnToFirstTable(); 292 } 293 294 @Override 295 public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) { 296 option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable); 297 } 298 299 @Override 300 public boolean isIgnoreCoordinate() { 301 return option.isIgnoreCoordinate(); 302 } 303 304 @Override 305 public void setIgnoreCoordinate(boolean ignoreCoordinate) { 306 option.setIgnoreCoordinate(ignoreCoordinate); 307 } 308 309 @Override 310 public void setHandleListener(DataFlowHandleListener listener) { 311 option.setHandleListener(listener); 312 } 313 314 @Override 315 public void setSqlEnv(TSQLEnv sqlenv) { 316 this.sqlenv = sqlenv; 317 } 318 319 @Override 320 public void setOption(Option option) { 321 this.option = option; 322 } 323 324 @Override 325 public Option getOption() { 326 return option; 327 } 328 329 @Override 330 public List<ErrorInfo> getErrorMessages() { 331 return errorInfos; 332 } 333 334 @Override 335 public synchronized String generateSqlInfos() { 336 return JSON.toJSONString(sqlInfoMap); 337 } 338 339 @Override 340 public synchronized String generateDataFlow() { 341 return generateDataFlow(false); 342 } 343 344 @Override 345 public Map<String, List<SqlInfo>> getSqlInfos() { 346 return sqlInfoMap; 347 } 348 349 @Override 350 /** 351 * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo 352 */ 353 public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) { 354 if (start == null || end == null) { 355 throw new IllegalArgumentException("Coordinate can't be null."); 356 } 357 358 String hashCode = start.getHashCode(); 359 360 if (hashCode == null) { 361 throw new IllegalArgumentException("Coordinate hashcode can't be null."); 362 } 363 364 int dbObjectStartLine = (int) start.getX() - 1; 365 int dbObjectStarColumn = (int) start.getY() - 1; 366 int dbObjectEndLine = (int) end.getX() - 1; 367 int dbObjectEndColumn = (int) end.getY() - 1; 368 List<SqlInfo> sqlInfoList; 369 if (hashCode.matches("\\d+")) { 370 sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode)); 371 } else { 372 sqlInfoList = sqlInfoMap.get(hashCode); 373 } 374 for (int j = 0; j < sqlInfoList.size(); j++) { 375 SqlInfo sqlInfo = sqlInfoList.get(j); 376 int startLine = sqlInfo.getLineStart(); 377 int endLine = sqlInfo.getLineEnd(); 378 if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) { 379 DbObjectPosition position = new DbObjectPosition(); 380 position.setFile(sqlInfo.getFileName()); 381 position.setFilePath(sqlInfo.getFilePath()); 382 position.setSql(sqlInfo.getSql()); 383 position.setIndex(sqlInfo.getOriginIndex()); 384 List<Pair<Integer, Integer>> positions = position.getPositions(); 385 positions.add(new Pair<Integer, Integer>( 386 dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1)); 387 positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1, 388 dbObjectEndColumn + 1)); 389 return position; 390 } 391 } 392 return null; 393 } 394 395 @Override 396 public synchronized String generateDataFlow(final boolean withExtraInfo) { 397 sqlInfoMap.clear(); 398 errorInfos.clear(); 399 Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>(); 400 for (int i = 0; i < sqlInfos.length; i++) { 401 SqlInfo sqlInfo = sqlInfos[i]; 402 if (sqlInfo == null || sqlInfo.getSql() == null) { 403 sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>()); 404 continue; 405 } 406 String sql = sqlInfo.getSql(); 407 if (sql != null && sql.trim().startsWith("{")) { 408 if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) { 409 String hash = SHA256.getMd5(sql); 410 String fileHash = SHA256.getMd5(hash); 411 if (!sqlInfoMap.containsKey(fileHash)) { 412 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 413 sqlInfoMap.get(fileHash).add(sqlInfo); 414 } 415 } else { 416 Map queryObject = (Map) JSON.parseObject(sql); 417 appendSqlInfo(databaseMap, i, sqlInfo, queryObject); 418 } 419 } else { 420 String content = sql; 421 String hash = SHA256.getMd5(content); 422 String fileHash = SHA256.getMd5(hash); 423 if (!sqlInfoMap.containsKey(fileHash)) { 424 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 425 426 String database = TSQLEnv.DEFAULT_DB_NAME; 427 String schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 428 if(sqlenv!=null) { 429 database = sqlenv.getDefaultCatalogName(); 430 if(database == null) { 431 database = TSQLEnv.DEFAULT_DB_NAME; 432 } 433 schema = sqlenv.getDefaultSchemaName(); 434 if(schema == null) { 435 schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 436 } 437 } 438 boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor()); 439 boolean supportSchema = TSQLEnv.supportSchema(option.getVendor()); 440 StringBuilder builder = new StringBuilder(); 441 if (supportCatalog) { 442 builder.append(database); 443 } 444 if (supportSchema) { 445 if (builder.length() > 0) { 446 builder.append("."); 447 } 448 builder.append(schema); 449 } 450 String group = builder.toString(); 451 SqlInfo sqlInfoItem = new SqlInfo(); 452 sqlInfoItem.setFileName(sqlInfo.getFileName()); 453 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 454 sqlInfoItem.setSql(sqlInfo.getSql()); 455 sqlInfoItem.setOriginIndex(0); 456 sqlInfoItem.setOriginLineStart(0); 457 sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1); 458 sqlInfoItem.setIndex(0); 459 sqlInfoItem.setLineStart(0); 460 sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1); 461 sqlInfoItem.setHash(fileHash); 462 sqlInfoItem.setGroup(group); 463 464 sqlInfoMap.get(fileHash).add(sqlInfoItem); 465 } 466 } 467 } 468 469 final TSQLEnv[] env = new TSQLEnv[]{sqlenv}; 470 if (sqlenv == null) { 471 TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos); 472 if (envs != null && envs.length > 0) { 473 env[0] = envs[0]; 474 } 475 } 476 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 477 .newFixedThreadPool(option.getParallel() < sqlInfos.length 478 ? option.getParallel() 479 : sqlInfos.length); 480 final Map<SqlInfo, dataflow> dataflowMap = new ConcurrentHashMap<>(); 481 try { 482 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 483 for (int i = 0; i < sqlInfos.length; i++) { 484 final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length]; 485 final SqlInfo item = sqlInfos[i]; 486 sqlInfoCopy[i] = item; 487 final Option optionCopy = (Option) option.clone(); 488 optionCopy.setStartId(5000000L * i); 489 optionCopy.setOutput(false); 490 Runnable task = new Runnable() { 491 @Override 492 public void run() { 493 try { 494 DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy); 495 analyzer.setSqlEnv(env[0]); 496 analyzer.generateDataFlow(withExtraInfo); 497 dataflow dataflow = analyzer.getDataFlow(); 498 errorInfos.addAll(analyzer.getErrorMessages()); 499 dataflowMap.put(item, dataflow); 500 hashSQLMap.putAll(analyzer.getHashSQLMap()); 501 analyzer.dispose(); 502 } finally { 503 latch.countDown(); 504 } 505 } 506 }; 507 executor.submit(task); 508 } 509 latch.await(); 510 } catch (Exception e) { 511 logger.error("execute task failed.", e); 512 } 513 executor.shutdown(); 514 515 ModelBindingManager modelManager = new ModelBindingManager(); 516 modelManager.setGlobalVendor(option.getVendor()); 517 modelManager.setGlobalOption(option); 518 ModelBindingManager.set(modelManager); 519 this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length); 520 dataflowMap.clear(); 521 if (dataflow != null && option.isOutput()) { 522 if (option.isTextFormat()) { 523 dataflowString = DataFlowAnalyzer.getTextOutput(dataflow); 524 } else { 525 try { 526 dataflowString = XML2Model.saveXML(dataflow); 527 }catch (Exception e){ 528 logger.error("save dataflow as xml failed.", e); 529 dataflowString = null; 530 } 531 } 532 } 533 ModelBindingManager.remove(); 534 return dataflowString; 535 } 536 537 private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index, 538 SqlInfo sqlInfo, Map queryObject) { 539 EDbVendor vendor = option.getVendor(); 540 if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){ 541 vendor = EDbVendor.valueOf(sqlInfo.getDbVendor()); 542 } 543 544 boolean supportCatalog = TSQLEnv.supportCatalog(vendor); 545 boolean supportSchema = TSQLEnv.supportSchema(vendor); 546 547 String content = (String) queryObject.get("sourceCode"); 548 if (SQLUtil.isEmpty(content)) { 549 return; 550 } 551 552 StringBuilder builder = new StringBuilder(); 553 if (supportCatalog) { 554 String database = (String) queryObject.get("database"); 555 if (database.indexOf(".") != -1) { 556 String delimitedChar = TSQLEnv.delimitedChar(vendor); 557 database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar; 558 } 559 builder.append(database); 560 } 561 if (supportSchema) { 562 String schema = (String) queryObject.get("schema"); 563 if (schema.indexOf(".") != -1) { 564 String delimitedChar = TSQLEnv.delimitedChar(vendor); 565 schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar; 566 } 567 if (builder.length() > 0) { 568 builder.append("."); 569 } 570 builder.append(schema); 571 } 572 String group = builder.toString(); 573 String sqlHash = SHA256.getMd5(content); 574 String hash = SHA256.getMd5(sqlHash); 575 if (!databaseMap.containsKey(sqlHash)) { 576 databaseMap.put(sqlHash, 577 new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group)); 578 } 579 TGSqlParser parser = new TGSqlParser(option.getVendor()); 580 String delimiterChar = String.valueOf(parser.getFlexer().delimiterchar); 581 StringBuilder buffer = new StringBuilder(content); 582 if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) { 583 buffer.append("\n"); 584 } else if(vendor == EDbVendor.dbvredshift 585 || vendor == EDbVendor.dbvgaussdb 586 || vendor == EDbVendor.dbvpostgresql 587 || vendor == EDbVendor.dbvmysql 588 || vendor == EDbVendor.dbvteradata){ 589 buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n"); 590 } else{ 591 SQLUtil.endTrim(buffer); 592 buffer.append(";").append("\n"); 593 } 594 595 int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1; 596 if (databaseMap.get(sqlHash).first.toString().length() == 0) { 597 lineStart = 0; 598 } 599 databaseMap.get(sqlHash).first.append(buffer.toString()); 600 SqlInfo sqlInfoItem = new SqlInfo(); 601 sqlInfoItem.setFileName(sqlInfo.getFileName()); 602 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 603 sqlInfoItem.setSql(buffer.toString()); 604 sqlInfoItem.setOriginIndex(index); 605 sqlInfoItem.setOriginLineStart(0); 606 sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1); 607 sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement()); 608 sqlInfoItem.setLineStart(lineStart); 609 sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1); 610 sqlInfoItem.setGroup(group); 611 sqlInfoItem.setHash(hash); 612 613 if (!sqlInfoMap.containsKey(hash)) { 614 sqlInfoMap.put(hash, new ArrayList<SqlInfo>()); 615 } 616 sqlInfoMap.get(hash).add(sqlInfoItem); 617 } 618 619 @Override 620 public void dispose() { 621 ModelBindingManager.remove(); 622 } 623 624 private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) { 625 return mergeDataFlows(dataflowMap.values(), startId); 626 } 627 628 public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) { 629 ModelBindingManager.setGlobalVendor(vendor); 630 try { 631 return mergeDataFlows(dataflows, 5000000L * dataflows.size()); 632 } finally { 633 ModelBindingManager.removeGlobalVendor(); 634 } 635 } 636 637 private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) { 638 dataflow mergeDataflow = new dataflow(); 639 640 List<table> tableCopy = new ArrayList<table>(); 641 List<table> viewCopy = new ArrayList<table>(); 642 List<table> databaseCopy = new ArrayList<table>(); 643 List<table> schemaCopy = new ArrayList<table>(); 644 List<table> stageCopy = new ArrayList<table>(); 645 List<table> dataSourceCopy = new ArrayList<table>(); 646 List<table> streamCopy = new ArrayList<table>(); 647 List<table> fileCopy = new ArrayList<table>(); 648 List<table> variableCopy = new ArrayList<table>(); 649 List<table> resultSetCopy = new ArrayList<table>(); 650 651 List<process> processCopy = new ArrayList<process>(); 652 List<relationship> relationshipCopy = new ArrayList<relationship>(); 653 List<procedure> procedureCopy = new ArrayList<procedure>(); 654 List<oraclePackage> packageCopy = new ArrayList<oraclePackage>(); 655 List<error> errorCopy = new ArrayList<error>(); 656 657 List<table> tables = new ArrayList<table>(); 658 for (dataflow dataflow : dataflows) { 659 if (dataflow.getTables() != null) { 660 tableCopy.addAll(dataflow.getTables()); 661 } 662 mergeDataflow.setTables(tableCopy); 663 if (dataflow.getViews() != null) { 664 viewCopy.addAll(dataflow.getViews()); 665 } 666 mergeDataflow.setViews(viewCopy); 667 if (dataflow.getDatabases() != null) { 668 databaseCopy.addAll(dataflow.getDatabases()); 669 } 670 mergeDataflow.setDatabases(databaseCopy); 671 if (dataflow.getSchemas() != null) { 672 schemaCopy.addAll(dataflow.getSchemas()); 673 } 674 mergeDataflow.setSchemas(schemaCopy); 675 if (dataflow.getStages() != null) { 676 stageCopy.addAll(dataflow.getStages()); 677 } 678 mergeDataflow.setStages(stageCopy); 679 if (dataflow.getDatasources() != null) { 680 dataSourceCopy.addAll(dataflow.getDatasources()); 681 } 682 mergeDataflow.setDatasources(dataSourceCopy); 683 if (dataflow.getStreams() != null) { 684 streamCopy.addAll(dataflow.getStreams()); 685 } 686 mergeDataflow.setStreams(streamCopy); 687 if (dataflow.getPaths() != null) { 688 fileCopy.addAll(dataflow.getPaths()); 689 } 690 mergeDataflow.setPaths(fileCopy); 691 if (dataflow.getVariables() != null) { 692 variableCopy.addAll(dataflow.getVariables()); 693 } 694 mergeDataflow.setVariables(variableCopy); 695 if (dataflow.getResultsets() != null) { 696 resultSetCopy.addAll(dataflow.getResultsets()); 697 } 698 mergeDataflow.setResultsets(resultSetCopy); 699 if (dataflow.getProcesses() != null) { 700 processCopy.addAll(dataflow.getProcesses()); 701 } 702 mergeDataflow.setProcesses(processCopy); 703 if (dataflow.getRelationships() != null) { 704 relationshipCopy.addAll(dataflow.getRelationships()); 705 } 706 mergeDataflow.setRelationships(relationshipCopy); 707 if (dataflow.getProcedures() != null) { 708 procedureCopy.addAll(dataflow.getProcedures()); 709 } 710 mergeDataflow.setProcedures(procedureCopy); 711 if (dataflow.getPackages() != null) { 712 packageCopy.addAll(dataflow.getPackages()); 713 } 714 mergeDataflow.setPackages(packageCopy); 715 if (dataflow.getErrors() != null) { 716 errorCopy.addAll(dataflow.getErrors()); 717 } 718 mergeDataflow.setErrors(errorCopy); 719 720 tables.addAll(dataflow.getTables()); 721 tables.addAll(dataflow.getViews()); 722 tables.addAll(dataflow.getDatabases()); 723 tables.addAll(dataflow.getSchemas()); 724 tables.addAll(dataflow.getStages()); 725 tables.addAll(dataflow.getDatasources()); 726 tables.addAll(dataflow.getStreams()); 727 tables.addAll(dataflow.getPaths()); 728 tables.addAll(dataflow.getResultsets()); 729 tables.addAll(dataflow.getVariables()); 730 } 731 732 Map<String, List<table>> tableMap = new HashMap<String, List<table>>(); 733 Map<String, String> tableTypeMap = new HashMap<String, String>(); 734 Map<String, String> tableIdMap = new HashMap<String, String>(); 735 736 Map<String, List<column>> columnMap = new HashMap<String, List<column>>(); 737 Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>(); 738 Map<String, String> columnIdMap = new HashMap<String, String>(); 739 Map<String, column> columnMergeIdMap = new HashMap<String, column>(); 740 741 List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures()); 742 if(mergeDataflow.getPackages()!=null){ 743 for(oraclePackage pkg : mergeDataflow.getPackages()){ 744 procedures.addAll(pkg.getProcedures()); 745 } 746 } 747 748 Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet()); 749 750 for (table table : tables) { 751 String qualifiedTableName = DlineageUtil.getQualifiedTableName(table); 752 String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName); 753 754 if (!tableMap.containsKey(tableFullName)) { 755 tableMap.put(tableFullName, new ArrayList<table>()); 756 } 757 758 tableMap.get(tableFullName).add(table); 759 760 if (!tableTypeMap.containsKey(tableFullName)) { 761 tableTypeMap.put(tableFullName, table.getType()); 762 } else if ("view".equals(table.getSubType())) { 763 tableTypeMap.put(tableFullName, table.getType()); 764 } else if ("database".equals(table.getSubType())) { 765 tableTypeMap.put(tableFullName, table.getType()); 766 } else if ("schema".equals(table.getSubType())) { 767 tableTypeMap.put(tableFullName, table.getType()); 768 } else if ("stage".equals(table.getSubType())) { 769 tableTypeMap.put(tableFullName, table.getType()); 770 } else if ("datasource".equals(table.getSubType())) { 771 tableTypeMap.put(tableFullName, table.getType()); 772 } else if ("stream".equals(table.getSubType())) { 773 tableTypeMap.put(tableFullName, table.getType()); 774 } else if ("file".equals(table.getSubType())) { 775 tableTypeMap.put(tableFullName, table.getType()); 776 } else if ("table".equals(tableTypeMap.get(tableFullName))) { 777 tableTypeMap.put(tableFullName, table.getType()); 778 } else if ("variable".equals(tableTypeMap.get(tableFullName))) { 779 tableTypeMap.put(tableFullName, table.getType()); 780 } 781 782 if (table.getColumns() != null) { 783 if (!tableColumnMap.containsKey(tableFullName)) { 784 tableColumnMap.put(tableFullName, new LinkedHashSet<String>()); 785 } 786 for (column column : table.getColumns()) { 787 String columnFullName = tableFullName + "." 788 + DlineageUtil.getIdentifierNormalColumnName(column.getName()); 789 790 if (!columnMap.containsKey(columnFullName)) { 791 columnMap.put(columnFullName, new ArrayList<column>()); 792 tableColumnMap.get(tableFullName).add(columnFullName); 793 } 794 795 columnMap.get(columnFullName).add(column); 796 } 797 } 798 } 799 800 Iterator<String> tableNameIter = tableMap.keySet().iterator(); 801 while (tableNameIter.hasNext()) { 802 String tableName = tableNameIter.next(); 803 List<table> tableList = tableMap.get(tableName); 804 table table; 805 if (tableList.size() > 1) { 806 table standardTable = tableList.get(0); 807 //Function允许重名,不做合并处理 808 if (standardTable.isFunction()) { 809 continue; 810 } 811 812 String type = tableTypeMap.get(tableName); 813 table = new table(); 814 table.setId(String.valueOf(++startId)); 815 table.setServer(standardTable.getServer()); 816 table.setDatabase(standardTable.getDatabase()); 817 table.setSchema(standardTable.getSchema()); 818 table.setName(standardTable.getName()); 819 table.setDisplayName(standardTable.getDisplayName()); 820 table.setParent(standardTable.getParent()); 821 table.setColumns(new ArrayList<column>()); 822 Set<String> processIds = new LinkedHashSet<String>(); 823 for (int k = 0; k < tableList.size(); k++) { 824 if (tableList.get(k).getProcessIds() != null) { 825 processIds.addAll(tableList.get(k).getProcessIds()); 826 } 827 } 828 if (!processIds.isEmpty()) { 829 table.setProcessIds(new ArrayList<String>(processIds)); 830 } 831 table.setType(type); 832 for (table item : tableList) { 833 if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) { 834 if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) { 835 table.appendCoordinate(item.getCoordinate()); 836 } 837 } else if (!SQLUtil.isEmpty(item.getCoordinate())) { 838 table.setCoordinate(item.getCoordinate()); 839 } 840 841 if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) { 842 table.setAlias(table.getAlias() + "," + item.getAlias()); 843 } else if (!SQLUtil.isEmpty(item.getAlias())) { 844 table.setAlias(item.getAlias()); 845 } 846 847 tableIdMap.put(item.getId(), table.getId()); 848 849 if (item.isView()) { 850 mergeDataflow.getViews().remove(item); 851 } else if (item.isDatabaseType()) { 852 mergeDataflow.getDatabases().remove(item); 853 } else if (item.isSchemaType()) { 854 mergeDataflow.getSchemas().remove(item); 855 } else if (item.isStage()) { 856 mergeDataflow.getStages().remove(item); 857 } else if (item.isDataSource()) { 858 mergeDataflow.getDatasources().remove(item); 859 } else if (item.isStream()) { 860 mergeDataflow.getStreams().remove(item); 861 } else if (item.isFile()) { 862 mergeDataflow.getPaths().remove(item); 863 } else if (item.isVariable()) { 864 mergeDataflow.getVariables().remove(item); 865 } else if (item.isTable()) { 866 mergeDataflow.getTables().remove(item); 867 } else if (item.isResultSet()) { 868 mergeDataflow.getResultsets().remove(item); 869 } 870 } 871 872 if (table.isView()) { 873 mergeDataflow.getViews().add(table); 874 } else if (table.isDatabaseType()) { 875 mergeDataflow.getDatabases().add(table); 876 } else if (table.isSchemaType()) { 877 mergeDataflow.getSchemas().add(table); 878 } else if (table.isStage()) { 879 mergeDataflow.getStages().add(table); 880 } else if (table.isDataSource()) { 881 mergeDataflow.getDatasources().add(table); 882 } else if (table.isStream()) { 883 mergeDataflow.getStreams().add(table); 884 } else if (table.isFile()) { 885 mergeDataflow.getPaths().add(table); 886 } else if (table.isVariable()) { 887 mergeDataflow.getVariables().add(table); 888 } else if (table.isResultSet()) { 889 mergeDataflow.getResultsets().add(table); 890 } else { 891 mergeDataflow.getTables().add(table); 892 } 893 } else { 894 table = tableList.get(0); 895 } 896 897 Set<String> columns = tableColumnMap.get(tableName); 898 Iterator<String> columnIter = columns.iterator(); 899 List<column> mergeColumns = new ArrayList<column>(); 900 while (columnIter.hasNext()) { 901 String columnName = columnIter.next(); 902 List<column> columnList = columnMap.get(columnName); 903 List<column> functions = new ArrayList<column>(); 904 for (column t : columnList) { 905 if (Boolean.TRUE.toString().equals(t.getIsFunction())) { 906 functions.add(t); 907 } 908 } 909 if (functions != null && !functions.isEmpty()) { 910 for (column function : functions) { 911 mergeColumns.add(function); 912 columnIdMap.put(function.getId(), function.getId()); 913 columnMergeIdMap.put(function.getId(), function); 914 } 915 916 columnList.removeAll(functions); 917 } 918 if (!columnList.isEmpty()) { 919 column firstColumn = columnList.iterator().next(); 920 if (columnList.size() > 1) { 921 column mergeColumn = new column(); 922 mergeColumn.setId(String.valueOf(++startId)); 923 mergeColumn.setName(firstColumn.getName()); 924 mergeColumn.setDisplayName(firstColumn.getDisplayName()); 925 mergeColumn.setSource(firstColumn.getSource()); 926 mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable()); 927 mergeColumns.add(mergeColumn); 928 for (column item : columnList) { 929 mergeColumn.appendCoordinate(item.getCoordinate()); 930 columnIdMap.put(item.getId(), mergeColumn.getId()); 931 //add by grq 2023.02.06 issue=I6DB5S 932 if(item.getDataType() != null){ 933 mergeColumn.setDataType(item.getDataType()); 934 } 935 if(item.isForeignKey() != null){ 936 mergeColumn.setForeignKey(item.isForeignKey()); 937 } 938 if(item.isUnqiueKey() != null){ 939 mergeColumn.setUnqiueKey(item.isUnqiueKey()); 940 } 941 if(item.isIndexKey() != null){ 942 mergeColumn.setIndexKey(item.isIndexKey()); 943 } 944 if(item.isPrimaryKey() != null){ 945 mergeColumn.setPrimaryKey(item.isPrimaryKey()); 946 } 947 //end by grq 948 } 949 columnMergeIdMap.put(mergeColumn.getId(), mergeColumn); 950 } else { 951 mergeColumns.add(firstColumn); 952 columnIdMap.put(firstColumn.getId(), firstColumn.getId()); 953 columnMergeIdMap.put(firstColumn.getId(), firstColumn); 954 } 955 } 956 } 957 table.setColumns(mergeColumns); 958 } 959 960 if (mergeDataflow.getRelationships() != null) { 961 Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>(); 962 for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) { 963 relationship relation = mergeDataflow.getRelationships().get(i); 964 if(RelationshipType.call.name().equals(relation.getType())){ 965 targetColumn target = relation.getCaller(); 966 if (target == null) { 967 continue; 968 } 969 if (target != null && tableIdMap.containsKey(target.getId())) { 970 target.setId(tableIdMap.get(target.getId())); 971 } 972 973 List<sourceColumn> sources = relation.getCallees(); 974 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 975 if (sources != null) { 976 for (sourceColumn source : sources) { 977 if (tableIdMap.containsKey(source.getId())) { 978 source.setId(tableIdMap.get(source.getId())); 979 } 980 } 981 sourceSet.addAll(sources); 982 relation.setCallees(new ArrayList<sourceColumn>(sourceSet)); 983 } 984 985 String jsonString = JSON.toJSONString(relation).replaceAll("\"id\":\".+?\"", ""); 986 String key = SHA256.getMd5(jsonString); 987 if (!mergeRelations.containsKey(key)) { 988 mergeRelations.put(key, relation); 989 } 990 } 991 else { 992 targetColumn target = relation.getTarget(); 993 if (target == null) { 994 continue; 995 } 996 if (target != null && tableIdMap.containsKey(target.getParent_id())) { 997 target.setParent_id(tableIdMap.get(target.getParent_id())); 998 } 999 1000 if (columnIdMap.containsKey(target.getId())) { 1001 target.setId(columnIdMap.get(target.getId())); 1002 target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate()); 1003 } 1004 1005 List<sourceColumn> sources = relation.getSources(); 1006 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1007 if (sources != null) { 1008 for (sourceColumn source : sources) { 1009 if (tableIdMap.containsKey(source.getParent_id())) { 1010 source.setParent_id(tableIdMap.get(source.getParent_id())); 1011 } 1012 if (tableIdMap.containsKey(source.getSource_id())) { 1013 source.setSource_id(tableIdMap.get(source.getSource_id())); 1014 } 1015 if (columnIdMap.containsKey(source.getId())) { 1016 source.setId(columnIdMap.get(source.getId())); 1017 source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate()); 1018 } 1019 } 1020 1021 sourceSet.addAll(sources); 1022 relation.setSources(new ArrayList<sourceColumn>(sourceSet)); 1023 } 1024 1025 String jsonString = JSON.toJSONString(relation).replaceAll("\"id\":\".+?\"", ""); 1026 String key = SHA256.getMd5(jsonString); 1027 if (!mergeRelations.containsKey(key)) { 1028 mergeRelations.put(key, relation); 1029 } 1030 } 1031 } 1032 1033 mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values())); 1034 } 1035 1036 tableMap.clear(); 1037 tableTypeMap.clear(); 1038 tableIdMap.clear(); 1039 columnMap.clear(); 1040 tableColumnMap.clear(); 1041 columnIdMap.clear(); 1042 columnMergeIdMap.clear(); 1043 tables.clear(); 1044 1045 return mergeDataflow; 1046 } 1047 1048 @Override 1049 public synchronized dataflow getDataFlow() { 1050 if (dataflow != null) { 1051 return dataflow; 1052 } else if (dataflowString != null) { 1053 return XML2Model.loadXML(dataflow.class, dataflowString); 1054 } 1055 return null; 1056 } 1057 1058 @Override 1059 public Map<String, String> getHashSQLMap() { 1060 return hashSQLMap; 1061 } 1062 1063 public static void main(String[] args) throws Exception { 1064// Option option = new Option(); 1065// option.setVendor(EDbVendor.dbvoracle); 1066// option.setOutput(false); 1067// ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\MRADM_PKG_ALL.sql")}, option); 1068// analyzer.generateDataFlow(); 1069// dataflow dataflow = analyzer.getDataFlow(); 1070 dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip")); 1071 //XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip")); 1072 dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow); 1073 dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle); 1074 System.out.println(XML2Model.saveXML(dataflow)); 1075 } 1076}