001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.TBaseType; 005import gudusoft.gsqlparser.TGSqlParser; 006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener; 007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader; 008import gudusoft.gsqlparser.dlineage.dataflow.model.*; 009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate; 010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*; 011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser; 012import gudusoft.gsqlparser.dlineage.util.*; 013import gudusoft.gsqlparser.sqlenv.TSQLEnv; 014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 015import gudusoft.gsqlparser.util.IndexedLinkedHashMap; 016import gudusoft.gsqlparser.util.Logger; 017import gudusoft.gsqlparser.util.LoggerFactory; 018import gudusoft.gsqlparser.util.SQLUtil; 019import gudusoft.gsqlparser.util.json.JSON; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.*; 024import java.util.concurrent.*; 025import java.util.concurrent.atomic.AtomicInteger; 026import java.util.stream.Collectors; 027 028public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer { 029 private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class); 030 private SqlInfo[] sqlInfos; 031 private Option option = new Option(); 032 private TSQLEnv sqlenv = null; 033 private dataflow dataflow; 034 private String dataflowString; 035 private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>(); 036 private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>(); 037 private final Map<String, String> hashSQLMap = new HashMap(); 038 039 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) { 040 this.sqlInfos = sqlInfos; 041 option.setVendor(dbVendor); 042 option.setSimpleOutput(simpleOutput); 043 ModelBindingManager.setGlobalOption(option); 044 } 045 046 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) { 047 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 048 for (int i = 0; i < sqlContents.length; i++) { 049 SqlInfo info = new SqlInfo(); 050 info.setSql(sqlContents[i]); 051 info.setOriginIndex(0); 052 sqlInfos[i] = info; 053 } 054 option.setVendor(dbVendor); 055 option.setSimpleOutput(simpleOutput); 056 option.setDefaultServer(defaultServer); 057 option.setDefaultDatabase(defaultDatabase); 058 option.setDefaultSchema(defaltSchema); 059 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 060 } 061 062 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) { 063 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 064 for (int i = 0; i < sqlContents.length; i++) { 065 SqlInfo info = new SqlInfo(); 066 info.setSql(sqlContents[i]); 067 info.setOriginIndex(0); 068 sqlInfos[i] = info; 069 } 070 option.setVendor(dbVendor); 071 option.setSimpleOutput(simpleOutput); 072 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 073 } 074 075 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) { 076 this.sqlInfos = sqlInfos; 077 this.option = option; 078 ModelBindingManager.setGlobalOption(option); 079 } 080 081 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) { 082 SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length]; 083 for (int i = 0; i < sqlFiles.length; i++) { 084 SqlInfo info = new SqlInfo(); 085 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 086 info.setFileName(sqlFiles[i].getName()); 087 info.setFilePath(sqlFiles[i].getAbsolutePath()); 088 info.setOriginIndex(0); 089 sqlInfos[i] = info; 090 } 091 this.sqlInfos = sqlInfos; 092 this.option = option; 093 ModelBindingManager.setGlobalOption(option); 094 } 095 096 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, File splitDir) { 097 List<SqlInfo> sqlInfoList = new ArrayList<SqlInfo>(); 098 for (int i = 0; i < sqlFiles.length; i++) { 099 try { 100 // Ensure split directory exists 101 if (!splitDir.exists() && !splitDir.mkdirs()) { 102 throw new IOException("Failed to create split directory: " + splitDir.getAbsolutePath()); 103 } 104 105 // Split the file if it's large 106 List<File> splitFiles = gudusoft.gsqlparser.util.FileSplitter.splitFile(sqlFiles[i], splitDir, splitSizeMB, option.getVendor()); 107 108 if (splitFiles.isEmpty()) { 109 // If no split files were created, process the original file 110 SqlInfo info = new SqlInfo(); 111 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 112 info.setFileName(sqlFiles[i].getName()); 113 info.setFilePath(sqlFiles[i].getAbsolutePath()); 114 info.setOriginIndex(0); 115 sqlInfoList.add(info); 116 } else { 117 // Process each split file 118 for (File splitFile : splitFiles) { 119 SqlInfo info = new SqlInfo(); 120 info.setSql(SQLUtil.getFileContent(splitFile)); 121 info.setFileName(splitFile.getName()); 122 info.setFilePath(splitFile.getAbsolutePath()); 123 info.setOriginIndex(0); 124 sqlInfoList.add(info); 125 } 126 } 127 } catch (Exception e) { 128 sqlInfoList.clear(); 129 logger.error("Error splitting file: " + sqlFiles[i].getAbsolutePath(), e); 130 // Fallback to original file if splitting fails 131 SqlInfo info = new SqlInfo(); 132 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 133 info.setFileName(sqlFiles[i].getName()); 134 info.setFilePath(sqlFiles[i].getAbsolutePath()); 135 info.setOriginIndex(0); 136 sqlInfoList.add(info); 137 } 138 } 139 this.sqlInfos = sqlInfoList.toArray(new SqlInfo[0]); 140 this.option = option; 141 ModelBindingManager.setGlobalOption(option); 142 } 143 144 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, String splitDirPath) { 145 this(sqlFiles, option, splitSizeMB, new File(splitDirPath)); 146 } 147 148 protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) { 149 List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>(); 150 List sqlContents = (List) JSON.parseObject(json); 151 for (int j = 0; j < sqlContents.size(); j++) { 152 Map sqlContent = (Map) sqlContents.get(j); 153 String sql = (String) sqlContent.get("sql"); 154 String fileName = (String) sqlContent.get("fileName"); 155 String filePath = (String) sqlContent.get("filePath"); 156 if (sql != null && sql.trim().startsWith("{")) { 157 if (sql.indexOf("createdBy") != -1) { 158 if (this.sqlenv == null) { 159 TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql); 160 if (sqlenvs != null) { 161 this.sqlenv = sqlenvs[0]; 162 } 163 } 164 if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) { 165 Map queryObject = (Map) JSON.parseObject(sql); 166 List querys = (List) queryObject.get("queries"); 167 if (querys != null) { 168 for (int i = 0; i < querys.size(); i++) { 169 Map object = (Map) querys.get(i); 170 SqlInfo info = new SqlInfo(); 171 info.setSql(JSON.toJSONString(object)); 172 info.setFileName(fileName); 173 info.setFilePath(filePath); 174 info.setOriginIndex(i); 175 sqlInfos.add(info); 176 } 177 queryObject.remove("queries"); 178 SqlInfo info = new SqlInfo(); 179 info.setSql(JSON.toJSONString(queryObject)); 180 info.setFileName(fileName); 181 info.setFilePath(filePath); 182 info.setOriginIndex(querys.size()); 183 sqlInfos.add(info); 184 } else { 185 SqlInfo info = new SqlInfo(); 186 info.setSql(JSON.toJSONString(queryObject)); 187 info.setFileName(fileName); 188 info.setFilePath(filePath); 189 info.setOriginIndex(0); 190 sqlInfos.add(info); 191 } 192 } 193 else if (sql.toLowerCase().indexOf("sqlflow") != -1) { 194 Map sqlflow = (Map) JSON.parseObject(sql); 195 List<Map> servers = (List<Map>) sqlflow.get("servers"); 196 if (servers != null) { 197 for (Map queryObject : servers) { 198 String name = (String) queryObject.get("name"); 199 String dbVendor = (String) queryObject.get("dbVendor"); 200 List querys = (List) queryObject.get("queries"); 201 if (querys != null) { 202 for (int i = 0; i < querys.size(); i++) { 203 Map object = (Map) querys.get(i); 204 SqlInfo info = new SqlInfo(); 205 info.setSql(JSON.toJSONString(object)); 206 info.setFileName(fileName); 207 info.setFilePath(filePath); 208 info.setOriginIndex(i); 209 info.setDbVendor(dbVendor); 210 info.setServer(name); 211 sqlInfos.add(info); 212 } 213 queryObject.remove("queries"); 214 SqlInfo info = new SqlInfo(); 215 info.setSql(JSON.toJSONString(queryObject)); 216 info.setFileName(fileName); 217 info.setFilePath(filePath); 218 info.setOriginIndex(querys.size()); 219 info.setDbVendor(dbVendor); 220 info.setServer(filePath); 221 sqlInfos.add(info); 222 } else { 223 SqlInfo info = new SqlInfo(); 224 info.setSql(JSON.toJSONString(queryObject)); 225 info.setFileName(fileName); 226 info.setFilePath(filePath); 227 info.setOriginIndex(0); 228 sqlInfos.add(info); 229 } 230 } 231 } 232 } 233 } 234 } else if (sql != null) { 235 SqlInfo info = new SqlInfo(); 236 info.setSql(sql); 237 info.setFileName(fileName); 238 info.setFilePath(filePath); 239 info.setOriginIndex(0); 240 sqlInfos.add(info); 241 } 242 } 243 return sqlInfos; 244 } 245 246 @Override 247 public boolean isIgnoreRecordSet() { 248 return option.isIgnoreRecordSet(); 249 } 250 251 @Override 252 public void setIgnoreRecordSet(boolean ignoreRecordSet) { 253 option.setIgnoreRecordSet(ignoreRecordSet); 254 } 255 256 @Override 257 public boolean isSimpleShowTopSelectResultSet() { 258 return option.isSimpleShowTopSelectResultSet(); 259 } 260 261 @Override 262 public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) { 263 option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet); 264 } 265 266 @Override 267 public boolean isSimpleShowFunction() { 268 return option.isSimpleShowFunction(); 269 } 270 271 @Override 272 public void setSimpleShowFunction(boolean simpleShowFunction) { 273 option.setSimpleShowFunction(simpleShowFunction); 274 } 275 276 @Override 277 public boolean isShowJoin() { 278 return option.isShowJoin(); 279 } 280 281 @Override 282 public void setShowJoin(boolean showJoin) { 283 option.setShowJoin(showJoin); 284 } 285 286 @Override 287 public boolean isShowImplicitSchema() { 288 return option.isShowImplicitSchema(); 289 } 290 291 @Override 292 public void setShowImplicitSchema(boolean showImplicitSchema) { 293 option.setShowImplicitSchema(showImplicitSchema); 294 } 295 296 @Override 297 public boolean isShowConstantTable() { 298 return option.isShowConstantTable(); 299 } 300 301 @Override 302 public void setShowConstantTable(boolean showConstantTable) { 303 option.setShowConstantTable(showConstantTable); 304 } 305 306 @Override 307 public boolean isShowCountTableColumn() { 308 return option.isShowCountTableColumn(); 309 } 310 311 @Override 312 public void setShowCountTableColumn(boolean showCountTableColumn) { 313 option.setShowCountTableColumn(showCountTableColumn); 314 } 315 316 @Override 317 public boolean isTransform() { 318 return option.isTransform(); 319 } 320 321 @Override 322 public void setTransform(boolean transform) { 323 option.setTransform(transform); 324 if (option.isTransformCoordinate()) { 325 option.setTransform(true); 326 } 327 } 328 329 @Override 330 public boolean isTransformCoordinate() { 331 return option.isTransformCoordinate(); 332 } 333 334 @Override 335 public void setTransformCoordinate(boolean transformCoordinate) { 336 option.setTransformCoordinate(transformCoordinate); 337 if (transformCoordinate) { 338 option.setTransform(true); 339 } 340 } 341 342 @Override 343 public boolean isLinkOrphanColumnToFirstTable() { 344 return option.isLinkOrphanColumnToFirstTable(); 345 } 346 347 @Override 348 public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) { 349 option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable); 350 } 351 352 @Override 353 public boolean isIgnoreCoordinate() { 354 return option.isIgnoreCoordinate(); 355 } 356 357 @Override 358 public void setIgnoreCoordinate(boolean ignoreCoordinate) { 359 option.setIgnoreCoordinate(ignoreCoordinate); 360 } 361 362 @Override 363 public void setHandleListener(DataFlowHandleListener listener) { 364 option.setHandleListener(listener); 365 } 366 367 @Override 368 public void setSqlEnv(TSQLEnv sqlenv) { 369 this.sqlenv = sqlenv; 370 } 371 372 @Override 373 public void setOption(Option option) { 374 this.option = option; 375 } 376 377 @Override 378 public Option getOption() { 379 return option; 380 } 381 382 @Override 383 public List<ErrorInfo> getErrorMessages() { 384 return errorInfos; 385 } 386 387 @Override 388 public synchronized String generateSqlInfos() { 389 return JSON.toJSONString(sqlInfoMap); 390 } 391 392 @Override 393 public synchronized String generateDataFlow() { 394 return generateDataFlow(false); 395 } 396 397 @Override 398 public Map<String, List<SqlInfo>> getSqlInfos() { 399 return sqlInfoMap; 400 } 401 402 @Override 403 /** 404 * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo 405 */ 406 public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) { 407 if (start == null || end == null) { 408 throw new IllegalArgumentException("Coordinate can't be null."); 409 } 410 411 String hashCode = start.getHashCode(); 412 413 if (hashCode == null) { 414 throw new IllegalArgumentException("Coordinate hashcode can't be null."); 415 } 416 417 int dbObjectStartLine = (int) start.getX() - 1; 418 int dbObjectStarColumn = (int) start.getY() - 1; 419 int dbObjectEndLine = (int) end.getX() - 1; 420 int dbObjectEndColumn = (int) end.getY() - 1; 421 List<SqlInfo> sqlInfoList; 422 if (hashCode.matches("\\d+")) { 423 sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode)); 424 } else { 425 sqlInfoList = sqlInfoMap.get(hashCode); 426 } 427 for (int j = 0; j < sqlInfoList.size(); j++) { 428 SqlInfo sqlInfo = sqlInfoList.get(j); 429 int startLine = sqlInfo.getLineStart(); 430 int endLine = sqlInfo.getLineEnd(); 431 if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) { 432 DbObjectPosition position = new DbObjectPosition(); 433 position.setFile(sqlInfo.getFileName()); 434 position.setFilePath(sqlInfo.getFilePath()); 435 position.setSql(sqlInfo.getSql()); 436 position.setIndex(sqlInfo.getOriginIndex()); 437 List<Pair<Integer, Integer>> positions = position.getPositions(); 438 positions.add(new Pair<Integer, Integer>( 439 dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1)); 440 positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1, 441 dbObjectEndColumn + 1)); 442 return position; 443 } 444 } 445 return null; 446 } 447 448 @Override 449 public synchronized String generateDataFlow(final boolean withExtraInfo) { 450 return generateDataFlow(withExtraInfo, true); 451 } 452 453 public synchronized String generateDataFlow(final boolean withExtraInfo, boolean useSaveMemoryMode) { 454 sqlInfoMap.clear(); 455 errorInfos.clear(); 456 Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>(); 457 for (int i = 0; i < sqlInfos.length; i++) { 458 SqlInfo sqlInfo = sqlInfos[i]; 459 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) { 460 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath())); 461 } 462 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) { 463 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName())); 464 } 465 if (sqlInfo == null || sqlInfo.getSql() == null) { 466 sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>()); 467 continue; 468 } 469 String sql = sqlInfo.getSql(); 470 if (sql != null && sql.trim().startsWith("{")) { 471 if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) { 472 String hash = SHA256.getMd5(sql); 473 String fileHash = SHA256.getMd5(hash); 474 if (!sqlInfoMap.containsKey(fileHash)) { 475 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 476 sqlInfoMap.get(fileHash).add(sqlInfo); 477 } 478 } else { 479 Map queryObject = (Map) JSON.parseObject(sql); 480 appendSqlInfo(databaseMap, i, sqlInfo, queryObject); 481 } 482 } else { 483 String content = sql; 484 String hash = SHA256.getMd5(content); 485 String fileHash = SHA256.getMd5(hash); 486 if (!sqlInfoMap.containsKey(fileHash)) { 487 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 488 489 String database = TSQLEnv.DEFAULT_DB_NAME; 490 String schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 491 if(sqlenv!=null) { 492 database = sqlenv.getDefaultCatalogName(); 493 if(database == null) { 494 database = TSQLEnv.DEFAULT_DB_NAME; 495 } 496 schema = sqlenv.getDefaultSchemaName(); 497 if(schema == null) { 498 schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 499 } 500 } 501 boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor()); 502 boolean supportSchema = TSQLEnv.supportSchema(option.getVendor()); 503 StringBuilder builder = new StringBuilder(); 504 if (supportCatalog) { 505 builder.append(database); 506 } 507 if (supportSchema) { 508 if (builder.length() > 0) { 509 builder.append("."); 510 } 511 builder.append(schema); 512 } 513 String group = builder.toString(); 514 SqlInfo sqlInfoItem = new SqlInfo(); 515 sqlInfoItem.setFileName(sqlInfo.getFileName()); 516 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 517 sqlInfoItem.setSql(sqlInfo.getSql()); 518 sqlInfoItem.setOriginIndex(0); 519 sqlInfoItem.setOriginLineStart(0); 520 sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1); 521 sqlInfoItem.setIndex(0); 522 sqlInfoItem.setLineStart(0); 523 sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1); 524 sqlInfoItem.setHash(fileHash); 525 sqlInfoItem.setGroup(group); 526 527 sqlInfoMap.get(fileHash).add(sqlInfoItem); 528 } 529 } 530 } 531 532 final TSQLEnv[] env = new TSQLEnv[]{sqlenv}; 533 if (sqlenv == null) { 534 TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos); 535 if (envs != null && envs.length > 0) { 536 env[0] = envs[0]; 537 } 538 } 539 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 540 .newFixedThreadPool(option.getParallel() < sqlInfos.length 541 ? option.getParallel() 542 : sqlInfos.length); 543 544 List<File> tempFiles = Collections.synchronizedList(new ArrayList<File>()); 545 final Map<SqlInfo, dataflow> dataflowMap = useSaveMemoryMode ? null : new ConcurrentHashMap<>(); 546 547 try { 548 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 549 550 logger.info("start parallel analyze " + sqlInfos.length + " sqlinfos"); 551 552 for (int i = 0; i < sqlInfos.length; i++) { 553 final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length]; 554 final SqlInfo item = sqlInfos[i]; 555 sqlInfoCopy[i] = item; 556 final Option optionCopy = (Option) option.clone(); 557 optionCopy.setStartId(5000000L * i); 558 optionCopy.setOutput(false); 559 final int index = i; 560 Runnable task = new Runnable() { 561 @Override 562 public void run() { 563 try { 564 logger.info("start analyze sqlinfo[" + index + "]" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : "")); 565 DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy); 566 analyzer.setSqlEnv(env[0]); 567 analyzer.generateDataFlow(withExtraInfo); 568 dataflow dataflow = analyzer.getDataFlow(); 569 logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()+", error count: "+ analyzer.getErrorMessages().size()); 570 if (analyzer.getErrorMessages().size() > 10000) { 571 dataflow.setErrors(new ArrayList<>(dataflow.getErrors().subList(0, 10000))); 572 errorInfos.addAll(analyzer.getErrorMessages().subList(0, 10000)); 573 logger.warn("Too many errors in dataflow ("+dataflow.getErrors().size()+"), truncating to first 10000 errors" + (sqlInfos[index].getFileName() != null ? ", file name = " + sqlInfos[index].getFileName() : "")); 574 } 575 else{ 576 errorInfos.addAll(analyzer.getErrorMessages()); 577 } 578 579 if (useSaveMemoryMode) { 580 File tempFile = File.createTempFile("dataflow_" + index + "_" + System.currentTimeMillis() + "_", ".xml.zip"); 581 XML2Model.saveXML(dataflow, tempFile); 582 tempFiles.add(tempFile); 583 } else { 584 dataflowMap.put(item, dataflow); 585 } 586 587 hashSQLMap.putAll(analyzer.getHashSQLMap()); 588 analyzer.dispose(); 589 } 590 catch (Exception e) { 591 logger.error("analyze sqlinfo[" + index + "] failed.", e); 592 } 593 finally { 594 latch.countDown(); 595 } 596 } 597 }; 598 executor.submit(task); 599 } 600 latch.await(); 601 } catch (Exception e) { 602 logger.error("execute task failed.", e); 603 } 604 executor.shutdown(); 605 606 ModelBindingManager modelManager = new ModelBindingManager(); 607 modelManager.setGlobalVendor(option.getVendor()); 608 modelManager.setGlobalOption(option); 609 ModelBindingManager.set(modelManager); 610 611 if (useSaveMemoryMode) { 612 // 使çâ€Â¨Ã¨Å ‚约内å˜模å¼Â,è¿Â代åˆ并 613 logger.info("start merge dataflow, dataflow count: " + tempFiles.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid); 614 this.dataflow = iterativeMergeDataFlows(tempFiles, 5000000L * sqlInfos.length); 615 if (this.dataflow != null && this.dataflow.getRelationships() != null) { 616 logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size()); 617 } 618 // 清ç†临时文件 619 for (File tempFile : tempFiles) { 620 tempFile.delete(); 621 } 622 } else { 623 // ä¿ÂæŒÂ原有逻辑 624 logger.info("start merge dataflow, dataflow count: " + dataflowMap.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid); 625 this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length); 626 if (this.dataflow != null && this.dataflow.getRelationships() != null) { 627 logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size()); 628 } 629 dataflowMap.clear(); 630 } 631 632 if (this.dataflow != null) { 633 logger.info("merge done, relation count: " + this.dataflow.getRelationships().size()); 634 } 635 636 if (dataflow != null && option.isOutput()) { 637 if (option.isTextFormat()) { 638 dataflowString = DataFlowAnalyzer.getTextOutput(dataflow); 639 } else { 640 try { 641 dataflowString = XML2Model.saveXML(dataflow); 642 }catch (Exception e){ 643 logger.error("save dataflow as xml failed.", e); 644 dataflowString = null; 645 } 646 } 647 } 648 ModelBindingManager.remove(); 649 return dataflowString; 650 } 651 652 private dataflow iterativeMergeDataFlows(List<File> tempFiles, long startId) { 653 if (tempFiles == null || tempFiles.isEmpty()) { 654 logger.warn("iterativeMergeDataFlows: tempFiles is null or empty"); 655 return null; 656 } 657 658 try { 659 Collections.sort(tempFiles, (f1, f2) -> Long.compare(f1.length(), f2.length())); 660 logger.info("iterativeMergeDataFlows: sorted tempFiles, first: " + tempFiles.get(0).length() + ", last: " + tempFiles.get(tempFiles.size() - 1).length()); 661 } catch (Exception e) { 662 logger.error("iterativeMergeDataFlows failed.", e); 663 } 664 665 long startTime = System.currentTimeMillis(); 666 int totalIterations = tempFiles.size() - 1; 667 logger.info("iterativeMergeDataFlows: start merging, total dataflows: " + tempFiles.size() + ", total iterations: " + totalIterations); 668 669 long loadStartTime = System.currentTimeMillis(); 670 dataflow mergedDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(0)); 671 long loadEndTime = System.currentTimeMillis(); 672 int initialRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 673 logger.info("iterativeMergeDataFlows: loaded initial dataflow, relation count: " + initialRelationCount + ", time: " + (loadEndTime - loadStartTime) + "ms"); 674 675 for (int i = 1; i < tempFiles.size(); i++) { 676 long iterationStartTime = System.currentTimeMillis(); 677 int currentIteration = i; 678 int remainingIterations = totalIterations - currentIteration + 1; 679 680 logger.info("iterativeMergeDataFlows: iteration " + currentIteration + "/" + totalIterations + ", remaining: " + remainingIterations); 681 682 long loadCurrentStartTime = System.currentTimeMillis(); 683 dataflow currentDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(i)); 684 long loadCurrentEndTime = System.currentTimeMillis(); 685 int currentRelationCount = currentDataflow.getRelationships() != null ? currentDataflow.getRelationships().size() : 0; 686 logger.info("iterativeMergeDataFlows: loaded dataflow[" + i + "], relation count: " + currentRelationCount + ", time: " + (loadCurrentEndTime - loadCurrentStartTime) + "ms"); 687 688 List<dataflow> tempList = new ArrayList<>(); 689 tempList.add(mergedDataflow); 690 tempList.add(currentDataflow); 691 692 long mergeStartTime = System.currentTimeMillis(); 693 mergedDataflow = mergeDataFlows(tempList, startId + 5000000L * i); 694 long mergeEndTime = System.currentTimeMillis(); 695 int mergedRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 696 697 long iterationEndTime = System.currentTimeMillis(); 698 long iterationTime = iterationEndTime - iterationStartTime; 699 long mergeTime = mergeEndTime - mergeStartTime; 700 logger.info("iterativeMergeDataFlows: iteration " + currentIteration + " completed, merged relation count: " + mergedRelationCount + ", merge time: " + mergeTime + "ms, total iteration time: " + iterationTime + "ms"); 701 702 currentDataflow = null; 703 tempList.clear(); 704 } 705 706 long endTime = System.currentTimeMillis(); 707 long totalTime = endTime - startTime; 708 int finalRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 709 logger.info("iterativeMergeDataFlows: all iterations completed, final relation count: " + finalRelationCount + ", total time: " + totalTime + "ms (" + (totalTime / 1000.0) + "s)"); 710 711 return mergedDataflow; 712 } 713 714 private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index, 715 SqlInfo sqlInfo, Map queryObject) { 716 EDbVendor vendor = option.getVendor(); 717 if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){ 718 vendor = EDbVendor.valueOf(sqlInfo.getDbVendor()); 719 } 720 721 boolean supportCatalog = TSQLEnv.supportCatalog(vendor); 722 boolean supportSchema = TSQLEnv.supportSchema(vendor); 723 724 String content = (String) queryObject.get("sourceCode"); 725 if (SQLUtil.isEmpty(content)) { 726 return; 727 } 728 729 StringBuilder builder = new StringBuilder(); 730 if (supportCatalog) { 731 String database = (String) queryObject.get("database"); 732 if (database.indexOf(".") != -1) { 733 String delimitedChar = TSQLEnv.delimitedChar(vendor); 734 database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar; 735 } 736 builder.append(database); 737 } 738 if (supportSchema) { 739 String schema = (String) queryObject.get("schema"); 740 if (schema.indexOf(".") != -1) { 741 String delimitedChar = TSQLEnv.delimitedChar(vendor); 742 schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar; 743 } 744 if (builder.length() > 0) { 745 builder.append("."); 746 } 747 builder.append(schema); 748 } 749 String group = builder.toString(); 750 String sqlHash = SHA256.getMd5(content); 751 String hash = SHA256.getMd5(sqlHash); 752 if (!databaseMap.containsKey(sqlHash)) { 753 databaseMap.put(sqlHash, 754 new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group)); 755 } 756 TGSqlParser parser = new TGSqlParser(option.getVendor()); 757 String delimiterChar = String.valueOf(parser.getDelimiterChar()); 758 StringBuilder buffer = new StringBuilder(content); 759 if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) { 760 buffer.append("\n"); 761 } else if(vendor == EDbVendor.dbvredshift 762 || vendor == EDbVendor.dbvgaussdb 763 || vendor == EDbVendor.dbvpostgresql 764 || vendor == EDbVendor.dbvmysql 765 || vendor == EDbVendor.dbvteradata){ 766 buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n"); 767 } else{ 768 SQLUtil.endTrim(buffer); 769 buffer.append(";").append("\n"); 770 } 771 772 int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1; 773 if (databaseMap.get(sqlHash).first.toString().length() == 0) { 774 lineStart = 0; 775 } 776 databaseMap.get(sqlHash).first.append(buffer.toString()); 777 SqlInfo sqlInfoItem = new SqlInfo(); 778 sqlInfoItem.setFileName(sqlInfo.getFileName()); 779 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 780 sqlInfoItem.setSql(buffer.toString()); 781 sqlInfoItem.setOriginIndex(index); 782 sqlInfoItem.setOriginLineStart(0); 783 sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1); 784 sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement()); 785 sqlInfoItem.setLineStart(lineStart); 786 sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1); 787 sqlInfoItem.setGroup(group); 788 sqlInfoItem.setHash(hash); 789 790 if (!sqlInfoMap.containsKey(hash)) { 791 sqlInfoMap.put(hash, new ArrayList<SqlInfo>()); 792 } 793 sqlInfoMap.get(hash).add(sqlInfoItem); 794 } 795 796 @Override 797 public void dispose() { 798 ModelBindingManager.remove(); 799 } 800 801 private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) { 802 return mergeDataFlows(dataflowMap.values(), startId); 803 } 804 805 public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) { 806 ModelBindingManager.setGlobalVendor(vendor); 807 try { 808 return mergeDataFlows(dataflows, 5000000L * dataflows.size()); 809 } finally { 810 ModelBindingManager.removeGlobalVendor(); 811 } 812 } 813 814 private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) { 815 dataflow mergeDataflow = new dataflow(); 816 817 List<table> tableCopy = new ArrayList<table>(); 818 List<table> viewCopy = new ArrayList<table>(); 819 List<table> databaseCopy = new ArrayList<table>(); 820 List<table> schemaCopy = new ArrayList<table>(); 821 List<table> stageCopy = new ArrayList<table>(); 822 List<table> dataSourceCopy = new ArrayList<table>(); 823 List<table> streamCopy = new ArrayList<table>(); 824 List<table> fileCopy = new ArrayList<table>(); 825 List<table> variableCopy = new ArrayList<table>(); 826 List<table> resultSetCopy = new ArrayList<table>(); 827 828 List<process> processCopy = new ArrayList<process>(); 829 List<relationship> relationshipCopy = new ArrayList<relationship>(); 830 List<procedure> procedureCopy = new ArrayList<procedure>(); 831 List<oraclePackage> packageCopy = new ArrayList<oraclePackage>(); 832 List<error> errorCopy = new ArrayList<error>(); 833 834 List<table> tables = new ArrayList<table>(); 835 for (dataflow dataflow : dataflows) { 836 if (dataflow == null) { 837 continue; 838 } 839 840 if (dataflow.getTables() != null) { 841 tableCopy.addAll(dataflow.getTables()); 842 } 843 mergeDataflow.setTables(tableCopy); 844 if (dataflow.getViews() != null) { 845 viewCopy.addAll(dataflow.getViews()); 846 } 847 mergeDataflow.setViews(viewCopy); 848 if (dataflow.getDatabases() != null) { 849 databaseCopy.addAll(dataflow.getDatabases()); 850 } 851 mergeDataflow.setDatabases(databaseCopy); 852 if (dataflow.getSchemas() != null) { 853 schemaCopy.addAll(dataflow.getSchemas()); 854 } 855 mergeDataflow.setSchemas(schemaCopy); 856 if (dataflow.getStages() != null) { 857 stageCopy.addAll(dataflow.getStages()); 858 } 859 mergeDataflow.setStages(stageCopy); 860 if (dataflow.getDatasources() != null) { 861 dataSourceCopy.addAll(dataflow.getDatasources()); 862 } 863 mergeDataflow.setDatasources(dataSourceCopy); 864 if (dataflow.getStreams() != null) { 865 streamCopy.addAll(dataflow.getStreams()); 866 } 867 mergeDataflow.setStreams(streamCopy); 868 if (dataflow.getPaths() != null) { 869 fileCopy.addAll(dataflow.getPaths()); 870 } 871 mergeDataflow.setPaths(fileCopy); 872 if (dataflow.getVariables() != null) { 873 variableCopy.addAll(dataflow.getVariables()); 874 } 875 mergeDataflow.setVariables(variableCopy); 876 if (dataflow.getResultsets() != null) { 877 resultSetCopy.addAll(dataflow.getResultsets()); 878 } 879 mergeDataflow.setResultsets(resultSetCopy); 880 if (dataflow.getProcesses() != null) { 881 processCopy.addAll(dataflow.getProcesses()); 882 } 883 mergeDataflow.setProcesses(processCopy); 884 if (dataflow.getRelationships() != null) { 885 relationshipCopy.addAll(dataflow.getRelationships()); 886 } 887 mergeDataflow.setRelationships(relationshipCopy); 888 if (dataflow.getProcedures() != null) { 889 procedureCopy.addAll(dataflow.getProcedures()); 890 } 891 mergeDataflow.setProcedures(procedureCopy); 892 if (dataflow.getPackages() != null) { 893 packageCopy.addAll(dataflow.getPackages()); 894 } 895 mergeDataflow.setPackages(packageCopy); 896 if (dataflow.getErrors() != null) { 897 errorCopy.addAll(dataflow.getErrors()); 898 } 899 if (errorCopy.size() > 10000) { 900 errorCopy = errorCopy.subList(0, 10000); 901 } 902 mergeDataflow.setErrors(errorCopy); 903 904 tables.addAll(dataflow.getTables()); 905 tables.addAll(dataflow.getViews()); 906 tables.addAll(dataflow.getDatabases()); 907 tables.addAll(dataflow.getSchemas()); 908 tables.addAll(dataflow.getStages()); 909 tables.addAll(dataflow.getDatasources()); 910 tables.addAll(dataflow.getStreams()); 911 tables.addAll(dataflow.getPaths()); 912 tables.addAll(dataflow.getResultsets()); 913 tables.addAll(dataflow.getVariables()); 914 } 915 916 Map<String, List<table>> tableMap = new HashMap<String, List<table>>(); 917 Map<String, String> tableTypeMap = new HashMap<String, String>(); 918 Map<String, String> tableIdMap = new HashMap<String, String>(); 919 920 Map<String, List<column>> columnMap = new HashMap<String, List<column>>(); 921 Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>(); 922 Map<String, String> columnIdMap = new HashMap<String, String>(); 923 Map<String, column> columnMergeIdMap = new HashMap<String, column>(); 924 925 List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures()); 926 if(mergeDataflow.getPackages()!=null){ 927 for(oraclePackage pkg : mergeDataflow.getPackages()){ 928 procedures.addAll(pkg.getProcedures()); 929 } 930 } 931 932 Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet()); 933 934 for (table table : tables) { 935 String qualifiedTableName = DlineageUtil.getQualifiedTableName(table); 936 String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName); 937 938 if (!tableMap.containsKey(tableFullName)) { 939 tableMap.put(tableFullName, new ArrayList<table>()); 940 } 941 942 tableMap.get(tableFullName).add(table); 943 944 if (!tableTypeMap.containsKey(tableFullName)) { 945 tableTypeMap.put(tableFullName, table.getType()); 946 } else if ("view".equals(table.getSubType())) { 947 tableTypeMap.put(tableFullName, table.getType()); 948 } else if ("database".equals(table.getSubType())) { 949 tableTypeMap.put(tableFullName, table.getType()); 950 } else if ("schema".equals(table.getSubType())) { 951 tableTypeMap.put(tableFullName, table.getType()); 952 } else if ("stage".equals(table.getSubType())) { 953 tableTypeMap.put(tableFullName, table.getType()); 954 } else if ("datasource".equals(table.getSubType())) { 955 tableTypeMap.put(tableFullName, table.getType()); 956 } else if ("stream".equals(table.getSubType())) { 957 tableTypeMap.put(tableFullName, table.getType()); 958 } else if ("file".equals(table.getSubType())) { 959 tableTypeMap.put(tableFullName, table.getType()); 960 } else if ("table".equals(tableTypeMap.get(tableFullName))) { 961 tableTypeMap.put(tableFullName, table.getType()); 962 } else if ("variable".equals(tableTypeMap.get(tableFullName))) { 963 tableTypeMap.put(tableFullName, table.getType()); 964 } 965 966 if (table.getColumns() != null) { 967 if (!tableColumnMap.containsKey(tableFullName)) { 968 tableColumnMap.put(tableFullName, new LinkedHashSet<String>()); 969 } 970 for (column column : table.getColumns()) { 971 String columnFullName = tableFullName + "." 972 + DlineageUtil.getIdentifierNormalColumnName(column.getName()); 973 974 if (!columnMap.containsKey(columnFullName)) { 975 columnMap.put(columnFullName, new ArrayList<column>()); 976 tableColumnMap.get(tableFullName).add(columnFullName); 977 } 978 979 columnMap.get(columnFullName).add(column); 980 } 981 } 982 } 983 984 Iterator<String> tableNameIter = tableMap.keySet().iterator(); 985 while (tableNameIter.hasNext()) { 986 String tableName = tableNameIter.next(); 987 List<table> tableList = tableMap.get(tableName); 988 table table; 989 if (tableList.size() > 1) { 990 table standardTable = tableList.get(0); 991 //Functionå…Â许é‡ÂÃ¥ÂÂ,ä¸Âåšåˆ并处ç†992 if (standardTable.isFunction()) { 993 continue; 994 } 995 996 String type = tableTypeMap.get(tableName); 997 table = new table(); 998 table.setId(String.valueOf(++startId)); 999 table.setServer(standardTable.getServer()); 1000 table.setDatabase(standardTable.getDatabase()); 1001 table.setSchema(standardTable.getSchema()); 1002 table.setName(standardTable.getName()); 1003 table.setDisplayName(standardTable.getDisplayName()); 1004 table.setParent(standardTable.getParent()); 1005 table.setColumns(new ArrayList<column>()); 1006 Set<String> processIds = new LinkedHashSet<String>(); 1007 for (int k = 0; k < tableList.size(); k++) { 1008 if (tableList.get(k).getProcessIds() != null) { 1009 processIds.addAll(tableList.get(k).getProcessIds()); 1010 } 1011 } 1012 if (!processIds.isEmpty()) { 1013 table.setProcessIds(new ArrayList<String>(processIds)); 1014 } 1015 table.setType(type); 1016 for (table item : tableList) { 1017 if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) { 1018 if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) { 1019 table.appendCoordinate(item.getCoordinate()); 1020 } 1021 } else if (!SQLUtil.isEmpty(item.getCoordinate())) { 1022 table.setCoordinate(item.getCoordinate()); 1023 } 1024 1025 if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) { 1026 table.setAlias(table.getAlias() + "," + item.getAlias()); 1027 } else if (!SQLUtil.isEmpty(item.getAlias())) { 1028 table.setAlias(item.getAlias()); 1029 } 1030 1031 tableIdMap.put(item.getId(), table.getId()); 1032 1033 if (item.isView()) { 1034 mergeDataflow.getViews().remove(item); 1035 } else if (item.isDatabaseType()) { 1036 mergeDataflow.getDatabases().remove(item); 1037 } else if (item.isSchemaType()) { 1038 mergeDataflow.getSchemas().remove(item); 1039 } else if (item.isStage()) { 1040 mergeDataflow.getStages().remove(item); 1041 } else if (item.isDataSource()) { 1042 mergeDataflow.getDatasources().remove(item); 1043 } else if (item.isStream()) { 1044 mergeDataflow.getStreams().remove(item); 1045 } else if (item.isFile()) { 1046 mergeDataflow.getPaths().remove(item); 1047 } else if (item.isVariable()) { 1048 mergeDataflow.getVariables().remove(item); 1049 } else if (item.isTable()) { 1050 mergeDataflow.getTables().remove(item); 1051 } else if (item.isResultSet()) { 1052 mergeDataflow.getResultsets().remove(item); 1053 } 1054 } 1055 1056 if (table.isView()) { 1057 mergeDataflow.getViews().add(table); 1058 } else if (table.isDatabaseType()) { 1059 mergeDataflow.getDatabases().add(table); 1060 } else if (table.isSchemaType()) { 1061 mergeDataflow.getSchemas().add(table); 1062 } else if (table.isStage()) { 1063 mergeDataflow.getStages().add(table); 1064 } else if (table.isDataSource()) { 1065 mergeDataflow.getDatasources().add(table); 1066 } else if (table.isStream()) { 1067 mergeDataflow.getStreams().add(table); 1068 } else if (table.isFile()) { 1069 mergeDataflow.getPaths().add(table); 1070 } else if (table.isVariable()) { 1071 mergeDataflow.getVariables().add(table); 1072 } else if (table.isResultSet()) { 1073 mergeDataflow.getResultsets().add(table); 1074 } else { 1075 mergeDataflow.getTables().add(table); 1076 } 1077 } else { 1078 table = tableList.get(0); 1079 } 1080 1081 Set<String> columns = tableColumnMap.get(tableName); 1082 Iterator<String> columnIter = columns.iterator(); 1083 List<column> mergeColumns = new ArrayList<column>(); 1084 while (columnIter.hasNext()) { 1085 String columnName = columnIter.next(); 1086 List<column> columnList = columnMap.get(columnName); 1087 List<column> functions = new ArrayList<column>(); 1088 for (column t : columnList) { 1089 if (Boolean.TRUE.toString().equals(t.getIsFunction())) { 1090 functions.add(t); 1091 } 1092 } 1093 if (functions != null && !functions.isEmpty()) { 1094 for (column function : functions) { 1095 mergeColumns.add(function); 1096 columnIdMap.put(function.getId(), function.getId()); 1097 columnMergeIdMap.put(function.getId(), function); 1098 } 1099 1100 columnList.removeAll(functions); 1101 } 1102 if (!columnList.isEmpty()) { 1103 column firstColumn = columnList.iterator().next(); 1104 if (columnList.size() > 1) { 1105 column mergeColumn = new column(); 1106 mergeColumn.setId(String.valueOf(++startId)); 1107 mergeColumn.setName(firstColumn.getName()); 1108 mergeColumn.setDisplayName(firstColumn.getDisplayName()); 1109 mergeColumn.setSource(firstColumn.getSource()); 1110 mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable()); 1111 mergeColumns.add(mergeColumn); 1112 for (column item : columnList) { 1113 mergeColumn.appendCoordinate(item.getCoordinate()); 1114 columnIdMap.put(item.getId(), mergeColumn.getId()); 1115 //add by grq 2023.02.06 issue=I6DB5S 1116 if(item.getDataType() != null){ 1117 mergeColumn.setDataType(item.getDataType()); 1118 } 1119 if(item.isForeignKey() != null){ 1120 mergeColumn.setForeignKey(item.isForeignKey()); 1121 } 1122 if(item.isUnqiueKey() != null){ 1123 mergeColumn.setUnqiueKey(item.isUnqiueKey()); 1124 } 1125 if(item.isIndexKey() != null){ 1126 mergeColumn.setIndexKey(item.isIndexKey()); 1127 } 1128 if(item.isPrimaryKey() != null){ 1129 mergeColumn.setPrimaryKey(item.isPrimaryKey()); 1130 } 1131 //end by grq 1132 } 1133 columnMergeIdMap.put(mergeColumn.getId(), mergeColumn); 1134 } else { 1135 mergeColumns.add(firstColumn); 1136 columnIdMap.put(firstColumn.getId(), firstColumn.getId()); 1137 columnMergeIdMap.put(firstColumn.getId(), firstColumn); 1138 } 1139 } 1140 } 1141 table.setColumns(mergeColumns); 1142 } 1143 1144 if (mergeDataflow.getRelationships() != null) { 1145 Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>(); 1146 for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) { 1147 relationship relation = mergeDataflow.getRelationships().get(i); 1148 if(RelationshipType.call.name().equals(relation.getType())){ 1149 targetColumn target = relation.getCaller(); 1150 if (target == null) { 1151 continue; 1152 } 1153 if (target != null && tableIdMap.containsKey(target.getId())) { 1154 target.setId(tableIdMap.get(target.getId())); 1155 } 1156 1157 List<sourceColumn> sources = relation.getCallees(); 1158 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1159 if (sources != null) { 1160 for (sourceColumn source : sources) { 1161 if (tableIdMap.containsKey(source.getId())) { 1162 source.setId(tableIdMap.get(source.getId())); 1163 } 1164 } 1165 sourceSet.addAll(sources); 1166 relation.setCallees(new ArrayList<sourceColumn>(sourceSet)); 1167 } 1168 1169 String jsonString = JSON.toJSONString(relation, true); 1170 String key = SHA256.getMd5(jsonString); 1171 if (!mergeRelations.containsKey(key)) { 1172 mergeRelations.put(key, relation); 1173 } 1174 } 1175 else { 1176 targetColumn target = relation.getTarget(); 1177 if (target == null) { 1178 continue; 1179 } 1180 if (target != null && tableIdMap.containsKey(target.getParent_id())) { 1181 target.setParent_id(tableIdMap.get(target.getParent_id())); 1182 } 1183 1184 if (columnIdMap.containsKey(target.getId())) { 1185 target.setId(columnIdMap.get(target.getId())); 1186 target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate()); 1187 } 1188 1189 List<sourceColumn> sources = relation.getSources(); 1190 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1191 if (sources != null) { 1192 for (sourceColumn source : sources) { 1193 if (tableIdMap.containsKey(source.getParent_id())) { 1194 source.setParent_id(tableIdMap.get(source.getParent_id())); 1195 } 1196 if (tableIdMap.containsKey(source.getSource_id())) { 1197 source.setSource_id(tableIdMap.get(source.getSource_id())); 1198 } 1199 if (columnIdMap.containsKey(source.getId())) { 1200 source.setId(columnIdMap.get(source.getId())); 1201 source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate()); 1202 } 1203 } 1204 1205 sourceSet.addAll(sources); 1206 relation.setSources(new ArrayList<sourceColumn>(sourceSet)); 1207 } 1208 1209 String jsonString = JSON.toJSONString(relation, true); 1210 String key = SHA256.getMd5(jsonString); 1211 if (!mergeRelations.containsKey(key)) { 1212 mergeRelations.put(key, relation); 1213 } 1214 } 1215 } 1216 1217 mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values())); 1218 } 1219 1220 tableMap.clear(); 1221 tableTypeMap.clear(); 1222 tableIdMap.clear(); 1223 columnMap.clear(); 1224 tableColumnMap.clear(); 1225 columnIdMap.clear(); 1226 columnMergeIdMap.clear(); 1227 tables.clear(); 1228 1229 return mergeDataflow; 1230 } 1231 1232 @Override 1233 public synchronized dataflow getDataFlow() { 1234 if (dataflow != null) { 1235 return dataflow; 1236 } else if (dataflowString != null) { 1237 return XML2Model.loadXML(dataflow.class, dataflowString); 1238 } 1239 return null; 1240 } 1241 1242 @Override 1243 public Map<String, String> getHashSQLMap() { 1244 return hashSQLMap; 1245 } 1246 1247 public static void main(String[] args) throws Exception { 1248 Option option = new Option(); 1249 option.setVendor(EDbVendor.dbvmssql); 1250 option.setOutput(false); 1251// option.setSimpleOutput(true); 1252 File parentDir = new File("C:\\Users\\KK\\Desktop\\sql"); 1253 ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\metadata_with_query\\客户原始sql.json")}, option, 5, new File("C:\\Users\\KK\\Desktop\\metadata_with_query")); 1254 analyzer.generateDataFlow(false, true); 1255 dataflow dataflow = analyzer.getDataFlow(); 1256// System.out.println(XML2Model.saveXML(dataflow)); 1257// dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip")); 1258 XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip")); 1259// dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow); 1260// dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle); 1261// System.out.println(XML2Model.saveXML(dataflow)); 1262 } 1263}