001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.TBaseType; 005import gudusoft.gsqlparser.TGSqlParser; 006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener; 007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader; 008import gudusoft.gsqlparser.dlineage.dataflow.model.*; 009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate; 010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*; 011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser; 012import gudusoft.gsqlparser.dlineage.util.*; 013import gudusoft.gsqlparser.sqlenv.TSQLEnv; 014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 015import gudusoft.gsqlparser.util.IndexedLinkedHashMap; 016import gudusoft.gsqlparser.util.Logger; 017import gudusoft.gsqlparser.util.LoggerFactory; 018import gudusoft.gsqlparser.util.SQLUtil; 019import gudusoft.gsqlparser.util.json.JSON; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.*; 024import java.util.concurrent.*; 025import java.util.concurrent.atomic.AtomicInteger; 026import java.util.stream.Collectors; 027 028public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer { 029 private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class); 030 private SqlInfo[] sqlInfos; 031 private Option option = new Option(); 032 private TSQLEnv sqlenv = null; 033 private dataflow dataflow; 034 private String dataflowString; 035 private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>(); 036 private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>(); 037 private final Map<String, String> hashSQLMap = new HashMap(); 038 039 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) { 040 this.sqlInfos = sqlInfos; 041 option.setVendor(dbVendor); 042 option.setSimpleOutput(simpleOutput); 043 ModelBindingManager.setGlobalOption(option); 044 } 045 046 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) { 047 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 048 for (int i = 0; i < sqlContents.length; i++) { 049 SqlInfo info = new SqlInfo(); 050 info.setSql(sqlContents[i]); 051 info.setOriginIndex(0); 052 sqlInfos[i] = info; 053 } 054 option.setVendor(dbVendor); 055 option.setSimpleOutput(simpleOutput); 056 option.setDefaultServer(defaultServer); 057 option.setDefaultDatabase(defaultDatabase); 058 option.setDefaultSchema(defaltSchema); 059 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 060 } 061 062 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) { 063 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 064 for (int i = 0; i < sqlContents.length; i++) { 065 SqlInfo info = new SqlInfo(); 066 info.setSql(sqlContents[i]); 067 info.setOriginIndex(0); 068 sqlInfos[i] = info; 069 } 070 option.setVendor(dbVendor); 071 option.setSimpleOutput(simpleOutput); 072 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 073 } 074 075 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) { 076 this.sqlInfos = sqlInfos; 077 this.option = option; 078 ModelBindingManager.setGlobalOption(option); 079 } 080 081 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) { 082 SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length]; 083 for (int i = 0; i < sqlFiles.length; i++) { 084 SqlInfo info = new SqlInfo(); 085 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 086 info.setFileName(sqlFiles[i].getName()); 087 info.setFilePath(sqlFiles[i].getAbsolutePath()); 088 info.setOriginIndex(0); 089 sqlInfos[i] = info; 090 } 091 this.sqlInfos = sqlInfos; 092 this.option = option; 093 ModelBindingManager.setGlobalOption(option); 094 } 095 096 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, File splitDir) { 097 List<SqlInfo> sqlInfoList = new ArrayList<SqlInfo>(); 098 for (int i = 0; i < sqlFiles.length; i++) { 099 try { 100 // Ensure split directory exists 101 if (!splitDir.exists() && !splitDir.mkdirs()) { 102 throw new IOException("Failed to create split directory: " + splitDir.getAbsolutePath()); 103 } 104 105 // Split the file if it's large 106 List<File> splitFiles = gudusoft.gsqlparser.util.FileSplitter.splitFile(sqlFiles[i], splitDir, splitSizeMB, option.getVendor()); 107 108 if (splitFiles.isEmpty()) { 109 // If no split files were created, process the original file 110 SqlInfo info = new SqlInfo(); 111 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 112 info.setFileName(sqlFiles[i].getName()); 113 info.setFilePath(sqlFiles[i].getAbsolutePath()); 114 info.setOriginIndex(0); 115 sqlInfoList.add(info); 116 } else { 117 // Process each split file 118 for (File splitFile : splitFiles) { 119 SqlInfo info = new SqlInfo(); 120 info.setSql(SQLUtil.getFileContent(splitFile)); 121 info.setFileName(splitFile.getName()); 122 info.setFilePath(splitFile.getAbsolutePath()); 123 info.setOriginIndex(0); 124 sqlInfoList.add(info); 125 } 126 } 127 } catch (Exception e) { 128 sqlInfoList.clear(); 129 logger.error("Error splitting file: " + sqlFiles[i].getAbsolutePath(), e); 130 // Fallback to original file if splitting fails 131 SqlInfo info = new SqlInfo(); 132 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 133 info.setFileName(sqlFiles[i].getName()); 134 info.setFilePath(sqlFiles[i].getAbsolutePath()); 135 info.setOriginIndex(0); 136 sqlInfoList.add(info); 137 } 138 } 139 this.sqlInfos = sqlInfoList.toArray(new SqlInfo[0]); 140 this.option = option; 141 ModelBindingManager.setGlobalOption(option); 142 } 143 144 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, String splitDirPath) { 145 this(sqlFiles, option, splitSizeMB, new File(splitDirPath)); 146 } 147 148 protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) { 149 List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>(); 150 List sqlContents = (List) JSON.parseObject(json); 151 for (int j = 0; j < sqlContents.size(); j++) { 152 Map sqlContent = (Map) sqlContents.get(j); 153 String sql = (String) sqlContent.get("sql"); 154 String fileName = (String) sqlContent.get("fileName"); 155 String filePath = (String) sqlContent.get("filePath"); 156 if (sql != null && sql.trim().startsWith("{")) { 157 if (sql.indexOf("createdBy") != -1) { 158 if (this.sqlenv == null) { 159 TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql); 160 if (sqlenvs != null) { 161 this.sqlenv = sqlenvs[0]; 162 } 163 } 164 if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) { 165 Map queryObject = (Map) JSON.parseObject(sql); 166 List querys = (List) queryObject.get("queries"); 167 if (querys != null) { 168 for (int i = 0; i < querys.size(); i++) { 169 Map object = (Map) querys.get(i); 170 SqlInfo info = new SqlInfo(); 171 info.setSql(JSON.toJSONString(object)); 172 info.setFileName(fileName); 173 info.setFilePath(filePath); 174 info.setOriginIndex(i); 175 sqlInfos.add(info); 176 } 177 queryObject.remove("queries"); 178 SqlInfo info = new SqlInfo(); 179 info.setSql(JSON.toJSONString(queryObject)); 180 info.setFileName(fileName); 181 info.setFilePath(filePath); 182 info.setOriginIndex(querys.size()); 183 sqlInfos.add(info); 184 } else { 185 SqlInfo info = new SqlInfo(); 186 info.setSql(JSON.toJSONString(queryObject)); 187 info.setFileName(fileName); 188 info.setFilePath(filePath); 189 info.setOriginIndex(0); 190 sqlInfos.add(info); 191 } 192 } 193 else if (sql.toLowerCase().indexOf("sqlflow") != -1) { 194 Map sqlflow = (Map) JSON.parseObject(sql); 195 List<Map> servers = (List<Map>) sqlflow.get("servers"); 196 if (servers != null) { 197 for (Map queryObject : servers) { 198 String name = (String) queryObject.get("name"); 199 String dbVendor = (String) queryObject.get("dbVendor"); 200 List querys = (List) queryObject.get("queries"); 201 if (querys != null) { 202 for (int i = 0; i < querys.size(); i++) { 203 Map object = (Map) querys.get(i); 204 SqlInfo info = new SqlInfo(); 205 info.setSql(JSON.toJSONString(object)); 206 info.setFileName(fileName); 207 info.setFilePath(filePath); 208 info.setOriginIndex(i); 209 info.setDbVendor(dbVendor); 210 info.setServer(name); 211 sqlInfos.add(info); 212 } 213 queryObject.remove("queries"); 214 SqlInfo info = new SqlInfo(); 215 info.setSql(JSON.toJSONString(queryObject)); 216 info.setFileName(fileName); 217 info.setFilePath(filePath); 218 info.setOriginIndex(querys.size()); 219 info.setDbVendor(dbVendor); 220 info.setServer(filePath); 221 sqlInfos.add(info); 222 } else { 223 SqlInfo info = new SqlInfo(); 224 info.setSql(JSON.toJSONString(queryObject)); 225 info.setFileName(fileName); 226 info.setFilePath(filePath); 227 info.setOriginIndex(0); 228 sqlInfos.add(info); 229 } 230 } 231 } 232 } 233 } 234 } else if (sql != null) { 235 SqlInfo info = new SqlInfo(); 236 info.setSql(sql); 237 info.setFileName(fileName); 238 info.setFilePath(filePath); 239 info.setOriginIndex(0); 240 sqlInfos.add(info); 241 } 242 } 243 return sqlInfos; 244 } 245 246 @Override 247 public boolean isIgnoreRecordSet() { 248 return option.isIgnoreRecordSet(); 249 } 250 251 @Override 252 public void setIgnoreRecordSet(boolean ignoreRecordSet) { 253 option.setIgnoreRecordSet(ignoreRecordSet); 254 } 255 256 @Override 257 public boolean isSimpleShowTopSelectResultSet() { 258 return option.isSimpleShowTopSelectResultSet(); 259 } 260 261 @Override 262 public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) { 263 option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet); 264 } 265 266 @Override 267 public boolean isSimpleShowFunction() { 268 return option.isSimpleShowFunction(); 269 } 270 271 @Override 272 public void setSimpleShowFunction(boolean simpleShowFunction) { 273 option.setSimpleShowFunction(simpleShowFunction); 274 } 275 276 @Override 277 public boolean isShowJoin() { 278 return option.isShowJoin(); 279 } 280 281 @Override 282 public void setShowJoin(boolean showJoin) { 283 option.setShowJoin(showJoin); 284 } 285 286 @Override 287 public boolean isShowImplicitSchema() { 288 return option.isShowImplicitSchema(); 289 } 290 291 @Override 292 public void setShowImplicitSchema(boolean showImplicitSchema) { 293 option.setShowImplicitSchema(showImplicitSchema); 294 } 295 296 @Override 297 public boolean isShowConstantTable() { 298 return option.isShowConstantTable(); 299 } 300 301 @Override 302 public void setShowConstantTable(boolean showConstantTable) { 303 option.setShowConstantTable(showConstantTable); 304 } 305 306 @Override 307 public boolean isShowCountTableColumn() { 308 return option.isShowCountTableColumn(); 309 } 310 311 @Override 312 public void setShowCountTableColumn(boolean showCountTableColumn) { 313 option.setShowCountTableColumn(showCountTableColumn); 314 } 315 316 @Override 317 public boolean isTransform() { 318 return option.isTransform(); 319 } 320 321 @Override 322 public void setTransform(boolean transform) { 323 option.setTransform(transform); 324 if (option.isTransformCoordinate()) { 325 option.setTransform(true); 326 } 327 } 328 329 @Override 330 public boolean isTransformCoordinate() { 331 return option.isTransformCoordinate(); 332 } 333 334 @Override 335 public void setTransformCoordinate(boolean transformCoordinate) { 336 option.setTransformCoordinate(transformCoordinate); 337 if (transformCoordinate) { 338 option.setTransform(true); 339 } 340 } 341 342 @Override 343 public boolean isLinkOrphanColumnToFirstTable() { 344 return option.isLinkOrphanColumnToFirstTable(); 345 } 346 347 @Override 348 public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) { 349 option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable); 350 } 351 352 @Override 353 public boolean isIgnoreCoordinate() { 354 return option.isIgnoreCoordinate(); 355 } 356 357 @Override 358 public void setIgnoreCoordinate(boolean ignoreCoordinate) { 359 option.setIgnoreCoordinate(ignoreCoordinate); 360 } 361 362 @Override 363 public void setHandleListener(DataFlowHandleListener listener) { 364 option.setHandleListener(listener); 365 } 366 367 @Override 368 public void setSqlEnv(TSQLEnv sqlenv) { 369 this.sqlenv = sqlenv; 370 } 371 372 @Override 373 public void setOption(Option option) { 374 this.option = option; 375 } 376 377 @Override 378 public Option getOption() { 379 return option; 380 } 381 382 @Override 383 public List<ErrorInfo> getErrorMessages() { 384 return errorInfos; 385 } 386 387 @Override 388 public synchronized String generateSqlInfos() { 389 return JSON.toJSONString(sqlInfoMap); 390 } 391 392 @Override 393 public synchronized String generateDataFlow() { 394 return generateDataFlow(false); 395 } 396 397 @Override 398 public Map<String, List<SqlInfo>> getSqlInfos() { 399 return sqlInfoMap; 400 } 401 402 @Override 403 /** 404 * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo 405 */ 406 public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) { 407 if (start == null || end == null) { 408 throw new IllegalArgumentException("Coordinate can't be null."); 409 } 410 411 String hashCode = start.getHashCode(); 412 413 if (hashCode == null) { 414 throw new IllegalArgumentException("Coordinate hashcode can't be null."); 415 } 416 417 int dbObjectStartLine = (int) start.getX() - 1; 418 int dbObjectStarColumn = (int) start.getY() - 1; 419 int dbObjectEndLine = (int) end.getX() - 1; 420 int dbObjectEndColumn = (int) end.getY() - 1; 421 List<SqlInfo> sqlInfoList; 422 if (hashCode.matches("\\d+")) { 423 sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode)); 424 } else { 425 sqlInfoList = sqlInfoMap.get(hashCode); 426 } 427 for (int j = 0; j < sqlInfoList.size(); j++) { 428 SqlInfo sqlInfo = sqlInfoList.get(j); 429 int startLine = sqlInfo.getLineStart(); 430 int endLine = sqlInfo.getLineEnd(); 431 if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) { 432 DbObjectPosition position = new DbObjectPosition(); 433 position.setFile(sqlInfo.getFileName()); 434 position.setFilePath(sqlInfo.getFilePath()); 435 position.setSql(sqlInfo.getSql()); 436 position.setIndex(sqlInfo.getOriginIndex()); 437 List<Pair<Integer, Integer>> positions = position.getPositions(); 438 positions.add(new Pair<Integer, Integer>( 439 dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1)); 440 positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1, 441 dbObjectEndColumn + 1)); 442 return position; 443 } 444 } 445 return null; 446 } 447 448 @Override 449 public synchronized String generateDataFlow(final boolean withExtraInfo) { 450 return generateDataFlow(withExtraInfo, true); 451 } 452 453 public synchronized String generateDataFlow(final boolean withExtraInfo, boolean useSaveMemoryMode) { 454 sqlInfoMap.clear(); 455 errorInfos.clear(); 456 Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>(); 457 for (int i = 0; i < sqlInfos.length; i++) { 458 SqlInfo sqlInfo = sqlInfos[i]; 459 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) { 460 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath())); 461 } 462 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) { 463 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName())); 464 } 465 if (sqlInfo == null || sqlInfo.getSql() == null) { 466 sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>()); 467 continue; 468 } 469 String sql = sqlInfo.getSql(); 470 if (sql != null && sql.trim().startsWith("{")) { 471 if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) { 472 String hash = SHA256.getMd5(sql); 473 String fileHash = SHA256.getMd5(hash); 474 if (!sqlInfoMap.containsKey(fileHash)) { 475 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 476 sqlInfoMap.get(fileHash).add(sqlInfo); 477 } 478 } else { 479 Map queryObject = (Map) JSON.parseObject(sql); 480 appendSqlInfo(databaseMap, i, sqlInfo, queryObject); 481 } 482 } else { 483 String content = sql; 484 String hash = SHA256.getMd5(content); 485 String fileHash = SHA256.getMd5(hash); 486 if (!sqlInfoMap.containsKey(fileHash)) { 487 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 488 489 String database = TSQLEnv.DEFAULT_DB_NAME; 490 String schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 491 if(sqlenv!=null) { 492 database = sqlenv.getDefaultCatalogName(); 493 if(database == null) { 494 database = TSQLEnv.DEFAULT_DB_NAME; 495 } 496 schema = sqlenv.getDefaultSchemaName(); 497 if(schema == null) { 498 schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 499 } 500 } 501 boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor()); 502 boolean supportSchema = TSQLEnv.supportSchema(option.getVendor()); 503 StringBuilder builder = new StringBuilder(); 504 if (supportCatalog) { 505 builder.append(database); 506 } 507 if (supportSchema) { 508 if (builder.length() > 0) { 509 builder.append("."); 510 } 511 builder.append(schema); 512 } 513 String group = builder.toString(); 514 SqlInfo sqlInfoItem = new SqlInfo(); 515 sqlInfoItem.setFileName(sqlInfo.getFileName()); 516 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 517 sqlInfoItem.setSql(sqlInfo.getSql()); 518 sqlInfoItem.setOriginIndex(0); 519 sqlInfoItem.setOriginLineStart(0); 520 sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1); 521 sqlInfoItem.setIndex(0); 522 sqlInfoItem.setLineStart(0); 523 sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1); 524 sqlInfoItem.setHash(fileHash); 525 sqlInfoItem.setGroup(group); 526 527 sqlInfoMap.get(fileHash).add(sqlInfoItem); 528 } 529 } 530 } 531 532 final TSQLEnv[] env = new TSQLEnv[]{sqlenv}; 533 if (sqlenv == null) { 534 TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos); 535 if (envs != null && envs.length > 0) { 536 env[0] = envs[0]; 537 } 538 } 539 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 540 .newFixedThreadPool(option.getParallel() < sqlInfos.length 541 ? option.getParallel() 542 : sqlInfos.length); 543 544 List<File> tempFiles = Collections.synchronizedList(new ArrayList<File>()); 545 final Map<SqlInfo, dataflow> dataflowMap = useSaveMemoryMode ? null : new ConcurrentHashMap<>(); 546 547 try { 548 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 549 550 logger.info("start parallel analyze " + sqlInfos.length + " sqlinfos"); 551 552 for (int i = 0; i < sqlInfos.length; i++) { 553 final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length]; 554 final SqlInfo item = sqlInfos[i]; 555 sqlInfoCopy[i] = item; 556 final Option optionCopy = (Option) option.clone(); 557 optionCopy.setStartId(5000000L * i); 558 optionCopy.setOutput(false); 559 final int index = i; 560 Runnable task = new Runnable() { 561 @Override 562 public void run() { 563 try { 564 logger.info("start analyze sqlinfo[" + index + "]" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : "")); 565 DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy); 566 analyzer.setSqlEnv(env[0]); 567 analyzer.generateDataFlow(withExtraInfo); 568 dataflow dataflow = analyzer.getDataFlow(); 569 if (dataflow == null) { 570 logger.warn("analyze sqlinfo[" + index + "] done, but dataflow is null" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : "")); 571 errorInfos.addAll(analyzer.getErrorMessages()); 572 analyzer.dispose(); 573 return; 574 } 575 logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()+", error count: "+ analyzer.getErrorMessages().size()); 576 if (analyzer.getErrorMessages().size() > 10000) { 577 dataflow.setErrors(new ArrayList<>(dataflow.getErrors().subList(0, 10000))); 578 errorInfos.addAll(analyzer.getErrorMessages().subList(0, 10000)); 579 logger.warn("Too many errors in dataflow ("+dataflow.getErrors().size()+"), truncating to first 10000 errors" + (sqlInfos[index].getFileName() != null ? ", file name = " + sqlInfos[index].getFileName() : "")); 580 } 581 else{ 582 errorInfos.addAll(analyzer.getErrorMessages()); 583 } 584 585 if (useSaveMemoryMode) { 586 File tempFile = File.createTempFile("dataflow_" + index + "_" + System.currentTimeMillis() + "_", ".xml.zip"); 587 XML2Model.saveXML(dataflow, tempFile); 588 tempFiles.add(tempFile); 589 } else { 590 dataflowMap.put(item, dataflow); 591 } 592 593 hashSQLMap.putAll(analyzer.getHashSQLMap()); 594 analyzer.dispose(); 595 } 596 catch (Exception e) { 597 logger.error("analyze sqlinfo[" + index + "] failed.", e); 598 } 599 finally { 600 latch.countDown(); 601 } 602 } 603 }; 604 executor.submit(task); 605 } 606 latch.await(); 607 } catch (Exception e) { 608 logger.error("execute task failed.", e); 609 } 610 executor.shutdown(); 611 612 ModelBindingManager modelManager = new ModelBindingManager(); 613 modelManager.setGlobalVendor(option.getVendor()); 614 modelManager.setGlobalOption(option); 615 ModelBindingManager.set(modelManager); 616 617 if (useSaveMemoryMode) { 618 // 使用节约内存模式,迭代合并 619 logger.info("start merge dataflow, dataflow count: " + tempFiles.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid); 620 this.dataflow = iterativeMergeDataFlows(tempFiles, 5000000L * sqlInfos.length); 621 if (this.dataflow != null && this.dataflow.getRelationships() != null) { 622 logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size()); 623 } 624 for (File tempFile : tempFiles) { 625 tempFile.delete(); 626 } 627 } else { 628 // 保持原有逻辑 629 logger.info("start merge dataflow, dataflow count: " + dataflowMap.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid); 630 this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length); 631 if (this.dataflow != null && this.dataflow.getRelationships() != null) { 632 logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size()); 633 } 634 dataflowMap.clear(); 635 } 636 637 if (this.dataflow != null) { 638 logger.info("merge done, relation count: " + this.dataflow.getRelationships().size()); 639 } 640 641 if (dataflow != null && option.isOutput()) { 642 if (option.isTextFormat()) { 643 dataflowString = DataFlowAnalyzer.getTextOutput(dataflow); 644 } else { 645 try { 646 dataflowString = XML2Model.saveXML(dataflow); 647 }catch (Exception e){ 648 logger.error("save dataflow as xml failed.", e); 649 dataflowString = null; 650 } 651 } 652 } 653 ModelBindingManager.remove(); 654 return dataflowString; 655 } 656 657 private dataflow iterativeMergeDataFlows(List<File> tempFiles, long startId) { 658 return DataflowUtility.iterativeMergeDataflowsFromFilesByStartId(tempFiles, startId); 659 } 660 661 private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index, 662 SqlInfo sqlInfo, Map queryObject) { 663 EDbVendor vendor = option.getVendor(); 664 if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){ 665 vendor = EDbVendor.valueOf(sqlInfo.getDbVendor()); 666 } 667 668 boolean supportCatalog = TSQLEnv.supportCatalog(vendor); 669 boolean supportSchema = TSQLEnv.supportSchema(vendor); 670 671 String groupName = (String) queryObject.get("groupName"); 672 if (DlineageUtil.isProcedureExcluded(groupName)) { 673 return; 674 } 675 676 String content = (String) queryObject.get("sourceCode"); 677 if (SQLUtil.isEmpty(content)) { 678 return; 679 } 680 681 StringBuilder builder = new StringBuilder(); 682 if (supportCatalog) { 683 String database = (String) queryObject.get("database"); 684 if (database.indexOf(".") != -1) { 685 String delimitedChar = TSQLEnv.delimitedChar(vendor); 686 database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar; 687 } 688 builder.append(database); 689 } 690 if (supportSchema) { 691 String schema = (String) queryObject.get("schema"); 692 if (schema.indexOf(".") != -1) { 693 String delimitedChar = TSQLEnv.delimitedChar(vendor); 694 schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar; 695 } 696 if (builder.length() > 0) { 697 builder.append("."); 698 } 699 builder.append(schema); 700 } 701 String group = builder.toString(); 702 String sqlHash = SHA256.getMd5(content); 703 String hash = SHA256.getMd5(sqlHash); 704 if (!databaseMap.containsKey(sqlHash)) { 705 databaseMap.put(sqlHash, 706 new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group)); 707 } 708 TGSqlParser parser = new TGSqlParser(option.getVendor()); 709 String delimiterChar = String.valueOf(parser.getDelimiterChar()); 710 StringBuilder buffer = new StringBuilder(content); 711 if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) { 712 buffer.append("\n"); 713 } else if(vendor == EDbVendor.dbvredshift 714 || vendor == EDbVendor.dbvgaussdb 715 || vendor == EDbVendor.dbvedb 716 || vendor == EDbVendor.dbvpostgresql 717 || vendor == EDbVendor.dbvmysql 718 || vendor == EDbVendor.dbvteradata){ 719 buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n"); 720 } else{ 721 SQLUtil.endTrim(buffer); 722 buffer.append(";").append("\n"); 723 } 724 725 int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1; 726 if (databaseMap.get(sqlHash).first.toString().length() == 0) { 727 lineStart = 0; 728 } 729 databaseMap.get(sqlHash).first.append(buffer.toString()); 730 SqlInfo sqlInfoItem = new SqlInfo(); 731 sqlInfoItem.setFileName(sqlInfo.getFileName()); 732 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 733 sqlInfoItem.setSql(buffer.toString()); 734 sqlInfoItem.setOriginIndex(index); 735 sqlInfoItem.setOriginLineStart(0); 736 sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1); 737 sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement()); 738 sqlInfoItem.setLineStart(lineStart); 739 sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1); 740 sqlInfoItem.setGroup(group); 741 sqlInfoItem.setHash(hash); 742 743 if (!sqlInfoMap.containsKey(hash)) { 744 sqlInfoMap.put(hash, new ArrayList<SqlInfo>()); 745 } 746 sqlInfoMap.get(hash).add(sqlInfoItem); 747 } 748 749 @Override 750 public void dispose() { 751 ModelBindingManager.remove(); 752 } 753 754 private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) { 755 return DataflowUtility.mergeDataflowsByStartId(dataflowMap.values(), startId); 756 } 757 758 @Override 759 public synchronized dataflow getDataFlow() { 760 if (dataflow != null) { 761 return dataflow; 762 } else if (dataflowString != null) { 763 return XML2Model.loadXML(dataflow.class, dataflowString); 764 } 765 return null; 766 } 767 768 @Override 769 public Map<String, String> getHashSQLMap() { 770 return hashSQLMap; 771 } 772 773 public static void main(String[] args) throws Exception { 774 Option option = new Option(); 775 option.setVendor(EDbVendor.dbvmssql); 776 option.setOutput(false); 777// option.setSimpleOutput(true); 778 File parentDir = new File("C:\\Users\\KK\\Desktop\\sql"); 779 ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\metadata_with_query\\客户原始sql.json")}, option, 5, new File("C:\\Users\\KK\\Desktop\\metadata_with_query")); 780 analyzer.generateDataFlow(false, true); 781 dataflow dataflow = analyzer.getDataFlow(); 782// System.out.println(XML2Model.saveXML(dataflow)); 783// dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip")); 784 XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip")); 785// dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow); 786// dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle); 787// System.out.println(XML2Model.saveXML(dataflow)); 788 } 789 790}