001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.TBaseType; 005import gudusoft.gsqlparser.TGSqlParser; 006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener; 007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader; 008import gudusoft.gsqlparser.dlineage.dataflow.model.*; 009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate; 010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*; 011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser; 012import gudusoft.gsqlparser.dlineage.util.*; 013import gudusoft.gsqlparser.sqlenv.TSQLEnv; 014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 015import gudusoft.gsqlparser.util.IndexedLinkedHashMap; 016import gudusoft.gsqlparser.util.Logger; 017import gudusoft.gsqlparser.util.LoggerFactory; 018import gudusoft.gsqlparser.util.SQLUtil; 019import gudusoft.gsqlparser.util.json.JSON; 020 021import java.io.File; 022import java.util.*; 023import java.util.concurrent.*; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.stream.Collectors; 026 027public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer { 028 private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class); 029 private SqlInfo[] sqlInfos; 030 private Option option = new Option(); 031 private TSQLEnv sqlenv = null; 032 private dataflow dataflow; 033 private String dataflowString; 034 private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>(); 035 private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>(); 036 private final Map<String, String> hashSQLMap = new HashMap(); 037 038 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) { 039 this.sqlInfos = sqlInfos; 040 option.setVendor(dbVendor); 041 option.setSimpleOutput(simpleOutput); 042 ModelBindingManager.setGlobalOption(option); 043 } 044 045 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) { 046 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 047 for (int i = 0; i < sqlContents.length; i++) { 048 SqlInfo info = new SqlInfo(); 049 info.setSql(sqlContents[i]); 050 info.setOriginIndex(0); 051 sqlInfos[i] = info; 052 } 053 option.setVendor(dbVendor); 054 option.setSimpleOutput(simpleOutput); 055 option.setDefaultServer(defaultServer); 056 option.setDefaultDatabase(defaultDatabase); 057 option.setDefaultSchema(defaltSchema); 058 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 059 } 060 061 public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) { 062 SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length]; 063 for (int i = 0; i < sqlContents.length; i++) { 064 SqlInfo info = new SqlInfo(); 065 info.setSql(sqlContents[i]); 066 info.setOriginIndex(0); 067 sqlInfos[i] = info; 068 } 069 option.setVendor(dbVendor); 070 option.setSimpleOutput(simpleOutput); 071 this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]); 072 } 073 074 public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) { 075 this.sqlInfos = sqlInfos; 076 this.option = option; 077 ModelBindingManager.setGlobalOption(option); 078 } 079 080 public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) { 081 SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length]; 082 for (int i = 0; i < sqlFiles.length; i++) { 083 SqlInfo info = new SqlInfo(); 084 info.setSql(SQLUtil.getFileContent(sqlFiles[i])); 085 info.setFileName(sqlFiles[i].getName()); 086 info.setFilePath(sqlFiles[i].getAbsolutePath()); 087 info.setOriginIndex(0); 088 sqlInfos[i] = info; 089 } 090 this.sqlInfos = sqlInfos; 091 this.option = option; 092 ModelBindingManager.setGlobalOption(option); 093 } 094 095 protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) { 096 List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>(); 097 List sqlContents = (List) JSON.parseObject(json); 098 for (int j = 0; j < sqlContents.size(); j++) { 099 Map sqlContent = (Map) sqlContents.get(j); 100 String sql = (String) sqlContent.get("sql"); 101 String fileName = (String) sqlContent.get("fileName"); 102 String filePath = (String) sqlContent.get("filePath"); 103 if (sql != null && sql.trim().startsWith("{")) { 104 if (sql.indexOf("createdBy") != -1) { 105 if (this.sqlenv == null) { 106 TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql); 107 if (sqlenvs != null) { 108 this.sqlenv = sqlenvs[0]; 109 } 110 } 111 if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) { 112 Map queryObject = (Map) JSON.parseObject(sql); 113 List querys = (List) queryObject.get("queries"); 114 if (querys != null) { 115 for (int i = 0; i < querys.size(); i++) { 116 Map object = (Map) querys.get(i); 117 SqlInfo info = new SqlInfo(); 118 info.setSql(JSON.toJSONString(object)); 119 info.setFileName(fileName); 120 info.setFilePath(filePath); 121 info.setOriginIndex(i); 122 sqlInfos.add(info); 123 } 124 queryObject.remove("queries"); 125 SqlInfo info = new SqlInfo(); 126 info.setSql(JSON.toJSONString(queryObject)); 127 info.setFileName(fileName); 128 info.setFilePath(filePath); 129 info.setOriginIndex(querys.size()); 130 sqlInfos.add(info); 131 } else { 132 SqlInfo info = new SqlInfo(); 133 info.setSql(JSON.toJSONString(queryObject)); 134 info.setFileName(fileName); 135 info.setFilePath(filePath); 136 info.setOriginIndex(0); 137 sqlInfos.add(info); 138 } 139 } 140 else if (sql.toLowerCase().indexOf("sqlflow") != -1) { 141 Map sqlflow = (Map) JSON.parseObject(sql); 142 List<Map> servers = (List<Map>) sqlflow.get("servers"); 143 if (servers != null) { 144 for (Map queryObject : servers) { 145 String name = (String) queryObject.get("name"); 146 String dbVendor = (String) queryObject.get("dbVendor"); 147 List querys = (List) queryObject.get("queries"); 148 if (querys != null) { 149 for (int i = 0; i < querys.size(); i++) { 150 Map object = (Map) querys.get(i); 151 SqlInfo info = new SqlInfo(); 152 info.setSql(JSON.toJSONString(object)); 153 info.setFileName(fileName); 154 info.setFilePath(filePath); 155 info.setOriginIndex(i); 156 info.setDbVendor(dbVendor); 157 info.setServer(name); 158 sqlInfos.add(info); 159 } 160 queryObject.remove("queries"); 161 SqlInfo info = new SqlInfo(); 162 info.setSql(JSON.toJSONString(queryObject)); 163 info.setFileName(fileName); 164 info.setFilePath(filePath); 165 info.setOriginIndex(querys.size()); 166 info.setDbVendor(dbVendor); 167 info.setServer(filePath); 168 sqlInfos.add(info); 169 } else { 170 SqlInfo info = new SqlInfo(); 171 info.setSql(JSON.toJSONString(queryObject)); 172 info.setFileName(fileName); 173 info.setFilePath(filePath); 174 info.setOriginIndex(0); 175 sqlInfos.add(info); 176 } 177 } 178 } 179 } 180 } 181 } else if (sql != null) { 182 SqlInfo info = new SqlInfo(); 183 info.setSql(sql); 184 info.setFileName(fileName); 185 info.setFilePath(filePath); 186 info.setOriginIndex(0); 187 sqlInfos.add(info); 188 } 189 } 190 return sqlInfos; 191 } 192 193 @Override 194 public boolean isIgnoreRecordSet() { 195 return option.isIgnoreRecordSet(); 196 } 197 198 @Override 199 public void setIgnoreRecordSet(boolean ignoreRecordSet) { 200 option.setIgnoreRecordSet(ignoreRecordSet); 201 } 202 203 @Override 204 public boolean isSimpleShowTopSelectResultSet() { 205 return option.isSimpleShowTopSelectResultSet(); 206 } 207 208 @Override 209 public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) { 210 option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet); 211 } 212 213 @Override 214 public boolean isSimpleShowFunction() { 215 return option.isSimpleShowFunction(); 216 } 217 218 @Override 219 public void setSimpleShowFunction(boolean simpleShowFunction) { 220 option.setSimpleShowFunction(simpleShowFunction); 221 } 222 223 @Override 224 public boolean isShowJoin() { 225 return option.isShowJoin(); 226 } 227 228 @Override 229 public void setShowJoin(boolean showJoin) { 230 option.setShowJoin(showJoin); 231 } 232 233 @Override 234 public boolean isShowImplicitSchema() { 235 return option.isShowImplicitSchema(); 236 } 237 238 @Override 239 public void setShowImplicitSchema(boolean showImplicitSchema) { 240 option.setShowImplicitSchema(showImplicitSchema); 241 } 242 243 @Override 244 public boolean isShowConstantTable() { 245 return option.isShowConstantTable(); 246 } 247 248 @Override 249 public void setShowConstantTable(boolean showConstantTable) { 250 option.setShowConstantTable(showConstantTable); 251 } 252 253 @Override 254 public boolean isShowCountTableColumn() { 255 return option.isShowCountTableColumn(); 256 } 257 258 @Override 259 public void setShowCountTableColumn(boolean showCountTableColumn) { 260 option.setShowCountTableColumn(showCountTableColumn); 261 } 262 263 @Override 264 public boolean isTransform() { 265 return option.isTransform(); 266 } 267 268 @Override 269 public void setTransform(boolean transform) { 270 option.setTransform(transform); 271 if (option.isTransformCoordinate()) { 272 option.setTransform(true); 273 } 274 } 275 276 @Override 277 public boolean isTransformCoordinate() { 278 return option.isTransformCoordinate(); 279 } 280 281 @Override 282 public void setTransformCoordinate(boolean transformCoordinate) { 283 option.setTransformCoordinate(transformCoordinate); 284 if (transformCoordinate) { 285 option.setTransform(true); 286 } 287 } 288 289 @Override 290 public boolean isLinkOrphanColumnToFirstTable() { 291 return option.isLinkOrphanColumnToFirstTable(); 292 } 293 294 @Override 295 public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) { 296 option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable); 297 } 298 299 @Override 300 public boolean isIgnoreCoordinate() { 301 return option.isIgnoreCoordinate(); 302 } 303 304 @Override 305 public void setIgnoreCoordinate(boolean ignoreCoordinate) { 306 option.setIgnoreCoordinate(ignoreCoordinate); 307 } 308 309 @Override 310 public void setHandleListener(DataFlowHandleListener listener) { 311 option.setHandleListener(listener); 312 } 313 314 @Override 315 public void setSqlEnv(TSQLEnv sqlenv) { 316 this.sqlenv = sqlenv; 317 } 318 319 @Override 320 public void setOption(Option option) { 321 this.option = option; 322 } 323 324 @Override 325 public Option getOption() { 326 return option; 327 } 328 329 @Override 330 public List<ErrorInfo> getErrorMessages() { 331 return errorInfos; 332 } 333 334 @Override 335 public synchronized String generateSqlInfos() { 336 return JSON.toJSONString(sqlInfoMap); 337 } 338 339 @Override 340 public synchronized String generateDataFlow() { 341 return generateDataFlow(false); 342 } 343 344 @Override 345 public Map<String, List<SqlInfo>> getSqlInfos() { 346 return sqlInfoMap; 347 } 348 349 @Override 350 /** 351 * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo 352 */ 353 public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) { 354 if (start == null || end == null) { 355 throw new IllegalArgumentException("Coordinate can't be null."); 356 } 357 358 String hashCode = start.getHashCode(); 359 360 if (hashCode == null) { 361 throw new IllegalArgumentException("Coordinate hashcode can't be null."); 362 } 363 364 int dbObjectStartLine = (int) start.getX() - 1; 365 int dbObjectStarColumn = (int) start.getY() - 1; 366 int dbObjectEndLine = (int) end.getX() - 1; 367 int dbObjectEndColumn = (int) end.getY() - 1; 368 List<SqlInfo> sqlInfoList; 369 if (hashCode.matches("\\d+")) { 370 sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode)); 371 } else { 372 sqlInfoList = sqlInfoMap.get(hashCode); 373 } 374 for (int j = 0; j < sqlInfoList.size(); j++) { 375 SqlInfo sqlInfo = sqlInfoList.get(j); 376 int startLine = sqlInfo.getLineStart(); 377 int endLine = sqlInfo.getLineEnd(); 378 if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) { 379 DbObjectPosition position = new DbObjectPosition(); 380 position.setFile(sqlInfo.getFileName()); 381 position.setFilePath(sqlInfo.getFilePath()); 382 position.setSql(sqlInfo.getSql()); 383 position.setIndex(sqlInfo.getOriginIndex()); 384 List<Pair<Integer, Integer>> positions = position.getPositions(); 385 positions.add(new Pair<Integer, Integer>( 386 dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1)); 387 positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1, 388 dbObjectEndColumn + 1)); 389 return position; 390 } 391 } 392 return null; 393 } 394 395 @Override 396 public synchronized String generateDataFlow(final boolean withExtraInfo) { 397 sqlInfoMap.clear(); 398 errorInfos.clear(); 399 Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>(); 400 for (int i = 0; i < sqlInfos.length; i++) { 401 SqlInfo sqlInfo = sqlInfos[i]; 402 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) { 403 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath())); 404 } 405 if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) { 406 sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName())); 407 } 408 if (sqlInfo == null || sqlInfo.getSql() == null) { 409 sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>()); 410 continue; 411 } 412 String sql = sqlInfo.getSql(); 413 if (sql != null && sql.trim().startsWith("{")) { 414 if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) { 415 String hash = SHA256.getMd5(sql); 416 String fileHash = SHA256.getMd5(hash); 417 if (!sqlInfoMap.containsKey(fileHash)) { 418 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 419 sqlInfoMap.get(fileHash).add(sqlInfo); 420 } 421 } else { 422 Map queryObject = (Map) JSON.parseObject(sql); 423 appendSqlInfo(databaseMap, i, sqlInfo, queryObject); 424 } 425 } else { 426 String content = sql; 427 String hash = SHA256.getMd5(content); 428 String fileHash = SHA256.getMd5(hash); 429 if (!sqlInfoMap.containsKey(fileHash)) { 430 sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>()); 431 432 String database = TSQLEnv.DEFAULT_DB_NAME; 433 String schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 434 if(sqlenv!=null) { 435 database = sqlenv.getDefaultCatalogName(); 436 if(database == null) { 437 database = TSQLEnv.DEFAULT_DB_NAME; 438 } 439 schema = sqlenv.getDefaultSchemaName(); 440 if(schema == null) { 441 schema = TSQLEnv.DEFAULT_SCHEMA_NAME; 442 } 443 } 444 boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor()); 445 boolean supportSchema = TSQLEnv.supportSchema(option.getVendor()); 446 StringBuilder builder = new StringBuilder(); 447 if (supportCatalog) { 448 builder.append(database); 449 } 450 if (supportSchema) { 451 if (builder.length() > 0) { 452 builder.append("."); 453 } 454 builder.append(schema); 455 } 456 String group = builder.toString(); 457 SqlInfo sqlInfoItem = new SqlInfo(); 458 sqlInfoItem.setFileName(sqlInfo.getFileName()); 459 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 460 sqlInfoItem.setSql(sqlInfo.getSql()); 461 sqlInfoItem.setOriginIndex(0); 462 sqlInfoItem.setOriginLineStart(0); 463 sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1); 464 sqlInfoItem.setIndex(0); 465 sqlInfoItem.setLineStart(0); 466 sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1); 467 sqlInfoItem.setHash(fileHash); 468 sqlInfoItem.setGroup(group); 469 470 sqlInfoMap.get(fileHash).add(sqlInfoItem); 471 } 472 } 473 } 474 475 final TSQLEnv[] env = new TSQLEnv[]{sqlenv}; 476 if (sqlenv == null) { 477 TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos); 478 if (envs != null && envs.length > 0) { 479 env[0] = envs[0]; 480 } 481 } 482 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 483 .newFixedThreadPool(option.getParallel() < sqlInfos.length 484 ? option.getParallel() 485 : sqlInfos.length); 486 final Map<SqlInfo, dataflow> dataflowMap = new ConcurrentHashMap<>(); 487 try { 488 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 489 for (int i = 0; i < sqlInfos.length; i++) { 490 final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length]; 491 final SqlInfo item = sqlInfos[i]; 492 sqlInfoCopy[i] = item; 493 final Option optionCopy = (Option) option.clone(); 494 optionCopy.setStartId(5000000L * i); 495 optionCopy.setOutput(false); 496 final int index = i; 497 Runnable task = new Runnable() { 498 @Override 499 public void run() { 500 try { 501 DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy); 502 analyzer.setSqlEnv(env[0]); 503 analyzer.generateDataFlow(withExtraInfo); 504 dataflow dataflow = analyzer.getDataFlow(); 505 logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()); 506 errorInfos.addAll(analyzer.getErrorMessages()); 507 dataflowMap.put(item, dataflow); 508 hashSQLMap.putAll(analyzer.getHashSQLMap()); 509 analyzer.dispose(); 510 } 511 catch (Exception e) { 512 logger.error("analyze sqlinfo[" + index + "] failed.", e); 513 } 514 finally { 515 latch.countDown(); 516 } 517 } 518 }; 519 executor.submit(task); 520 } 521 latch.await(); 522 } catch (Exception e) { 523 logger.error("execute task failed.", e); 524 } 525 executor.shutdown(); 526 527 ModelBindingManager modelManager = new ModelBindingManager(); 528 modelManager.setGlobalVendor(option.getVendor()); 529 modelManager.setGlobalOption(option); 530 ModelBindingManager.set(modelManager); 531 this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length); 532 if (this.dataflow != null) { 533 logger.info("merge " + dataflowMap.size() + " dataflows done, relation count: " + dataflow.getRelationships().size()); 534 } 535 dataflowMap.clear(); 536 if (dataflow != null && option.isOutput()) { 537 if (option.isTextFormat()) { 538 dataflowString = DataFlowAnalyzer.getTextOutput(dataflow); 539 } else { 540 try { 541 dataflowString = XML2Model.saveXML(dataflow); 542 }catch (Exception e){ 543 logger.error("save dataflow as xml failed.", e); 544 dataflowString = null; 545 } 546 } 547 } 548 ModelBindingManager.remove(); 549 return dataflowString; 550 } 551 552 private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index, 553 SqlInfo sqlInfo, Map queryObject) { 554 EDbVendor vendor = option.getVendor(); 555 if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){ 556 vendor = EDbVendor.valueOf(sqlInfo.getDbVendor()); 557 } 558 559 boolean supportCatalog = TSQLEnv.supportCatalog(vendor); 560 boolean supportSchema = TSQLEnv.supportSchema(vendor); 561 562 String content = (String) queryObject.get("sourceCode"); 563 if (SQLUtil.isEmpty(content)) { 564 return; 565 } 566 567 StringBuilder builder = new StringBuilder(); 568 if (supportCatalog) { 569 String database = (String) queryObject.get("database"); 570 if (database.indexOf(".") != -1) { 571 String delimitedChar = TSQLEnv.delimitedChar(vendor); 572 database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar; 573 } 574 builder.append(database); 575 } 576 if (supportSchema) { 577 String schema = (String) queryObject.get("schema"); 578 if (schema.indexOf(".") != -1) { 579 String delimitedChar = TSQLEnv.delimitedChar(vendor); 580 schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar; 581 } 582 if (builder.length() > 0) { 583 builder.append("."); 584 } 585 builder.append(schema); 586 } 587 String group = builder.toString(); 588 String sqlHash = SHA256.getMd5(content); 589 String hash = SHA256.getMd5(sqlHash); 590 if (!databaseMap.containsKey(sqlHash)) { 591 databaseMap.put(sqlHash, 592 new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group)); 593 } 594 TGSqlParser parser = new TGSqlParser(option.getVendor()); 595 String delimiterChar = String.valueOf(parser.getDelimiterChar()); 596 StringBuilder buffer = new StringBuilder(content); 597 if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) { 598 buffer.append("\n"); 599 } else if(vendor == EDbVendor.dbvredshift 600 || vendor == EDbVendor.dbvgaussdb 601 || vendor == EDbVendor.dbvpostgresql 602 || vendor == EDbVendor.dbvmysql 603 || vendor == EDbVendor.dbvteradata){ 604 buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n"); 605 } else{ 606 SQLUtil.endTrim(buffer); 607 buffer.append(";").append("\n"); 608 } 609 610 int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1; 611 if (databaseMap.get(sqlHash).first.toString().length() == 0) { 612 lineStart = 0; 613 } 614 databaseMap.get(sqlHash).first.append(buffer.toString()); 615 SqlInfo sqlInfoItem = new SqlInfo(); 616 sqlInfoItem.setFileName(sqlInfo.getFileName()); 617 sqlInfoItem.setFilePath(sqlInfo.getFilePath()); 618 sqlInfoItem.setSql(buffer.toString()); 619 sqlInfoItem.setOriginIndex(index); 620 sqlInfoItem.setOriginLineStart(0); 621 sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1); 622 sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement()); 623 sqlInfoItem.setLineStart(lineStart); 624 sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1); 625 sqlInfoItem.setGroup(group); 626 sqlInfoItem.setHash(hash); 627 628 if (!sqlInfoMap.containsKey(hash)) { 629 sqlInfoMap.put(hash, new ArrayList<SqlInfo>()); 630 } 631 sqlInfoMap.get(hash).add(sqlInfoItem); 632 } 633 634 @Override 635 public void dispose() { 636 ModelBindingManager.remove(); 637 } 638 639 private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) { 640 return mergeDataFlows(dataflowMap.values(), startId); 641 } 642 643 public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) { 644 ModelBindingManager.setGlobalVendor(vendor); 645 try { 646 return mergeDataFlows(dataflows, 5000000L * dataflows.size()); 647 } finally { 648 ModelBindingManager.removeGlobalVendor(); 649 } 650 } 651 652 private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) { 653 dataflow mergeDataflow = new dataflow(); 654 655 List<table> tableCopy = new ArrayList<table>(); 656 List<table> viewCopy = new ArrayList<table>(); 657 List<table> databaseCopy = new ArrayList<table>(); 658 List<table> schemaCopy = new ArrayList<table>(); 659 List<table> stageCopy = new ArrayList<table>(); 660 List<table> dataSourceCopy = new ArrayList<table>(); 661 List<table> streamCopy = new ArrayList<table>(); 662 List<table> fileCopy = new ArrayList<table>(); 663 List<table> variableCopy = new ArrayList<table>(); 664 List<table> resultSetCopy = new ArrayList<table>(); 665 666 List<process> processCopy = new ArrayList<process>(); 667 List<relationship> relationshipCopy = new ArrayList<relationship>(); 668 List<procedure> procedureCopy = new ArrayList<procedure>(); 669 List<oraclePackage> packageCopy = new ArrayList<oraclePackage>(); 670 List<error> errorCopy = new ArrayList<error>(); 671 672 List<table> tables = new ArrayList<table>(); 673 for (dataflow dataflow : dataflows) { 674 if (dataflow.getTables() != null) { 675 tableCopy.addAll(dataflow.getTables()); 676 } 677 mergeDataflow.setTables(tableCopy); 678 if (dataflow.getViews() != null) { 679 viewCopy.addAll(dataflow.getViews()); 680 } 681 mergeDataflow.setViews(viewCopy); 682 if (dataflow.getDatabases() != null) { 683 databaseCopy.addAll(dataflow.getDatabases()); 684 } 685 mergeDataflow.setDatabases(databaseCopy); 686 if (dataflow.getSchemas() != null) { 687 schemaCopy.addAll(dataflow.getSchemas()); 688 } 689 mergeDataflow.setSchemas(schemaCopy); 690 if (dataflow.getStages() != null) { 691 stageCopy.addAll(dataflow.getStages()); 692 } 693 mergeDataflow.setStages(stageCopy); 694 if (dataflow.getDatasources() != null) { 695 dataSourceCopy.addAll(dataflow.getDatasources()); 696 } 697 mergeDataflow.setDatasources(dataSourceCopy); 698 if (dataflow.getStreams() != null) { 699 streamCopy.addAll(dataflow.getStreams()); 700 } 701 mergeDataflow.setStreams(streamCopy); 702 if (dataflow.getPaths() != null) { 703 fileCopy.addAll(dataflow.getPaths()); 704 } 705 mergeDataflow.setPaths(fileCopy); 706 if (dataflow.getVariables() != null) { 707 variableCopy.addAll(dataflow.getVariables()); 708 } 709 mergeDataflow.setVariables(variableCopy); 710 if (dataflow.getResultsets() != null) { 711 resultSetCopy.addAll(dataflow.getResultsets()); 712 } 713 mergeDataflow.setResultsets(resultSetCopy); 714 if (dataflow.getProcesses() != null) { 715 processCopy.addAll(dataflow.getProcesses()); 716 } 717 mergeDataflow.setProcesses(processCopy); 718 if (dataflow.getRelationships() != null) { 719 relationshipCopy.addAll(dataflow.getRelationships()); 720 } 721 mergeDataflow.setRelationships(relationshipCopy); 722 if (dataflow.getProcedures() != null) { 723 procedureCopy.addAll(dataflow.getProcedures()); 724 } 725 mergeDataflow.setProcedures(procedureCopy); 726 if (dataflow.getPackages() != null) { 727 packageCopy.addAll(dataflow.getPackages()); 728 } 729 mergeDataflow.setPackages(packageCopy); 730 if (dataflow.getErrors() != null) { 731 errorCopy.addAll(dataflow.getErrors()); 732 } 733 mergeDataflow.setErrors(errorCopy); 734 735 tables.addAll(dataflow.getTables()); 736 tables.addAll(dataflow.getViews()); 737 tables.addAll(dataflow.getDatabases()); 738 tables.addAll(dataflow.getSchemas()); 739 tables.addAll(dataflow.getStages()); 740 tables.addAll(dataflow.getDatasources()); 741 tables.addAll(dataflow.getStreams()); 742 tables.addAll(dataflow.getPaths()); 743 tables.addAll(dataflow.getResultsets()); 744 tables.addAll(dataflow.getVariables()); 745 } 746 747 Map<String, List<table>> tableMap = new HashMap<String, List<table>>(); 748 Map<String, String> tableTypeMap = new HashMap<String, String>(); 749 Map<String, String> tableIdMap = new HashMap<String, String>(); 750 751 Map<String, List<column>> columnMap = new HashMap<String, List<column>>(); 752 Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>(); 753 Map<String, String> columnIdMap = new HashMap<String, String>(); 754 Map<String, column> columnMergeIdMap = new HashMap<String, column>(); 755 756 List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures()); 757 if(mergeDataflow.getPackages()!=null){ 758 for(oraclePackage pkg : mergeDataflow.getPackages()){ 759 procedures.addAll(pkg.getProcedures()); 760 } 761 } 762 763 Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet()); 764 765 for (table table : tables) { 766 String qualifiedTableName = DlineageUtil.getQualifiedTableName(table); 767 String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName); 768 769 if (!tableMap.containsKey(tableFullName)) { 770 tableMap.put(tableFullName, new ArrayList<table>()); 771 } 772 773 tableMap.get(tableFullName).add(table); 774 775 if (!tableTypeMap.containsKey(tableFullName)) { 776 tableTypeMap.put(tableFullName, table.getType()); 777 } else if ("view".equals(table.getSubType())) { 778 tableTypeMap.put(tableFullName, table.getType()); 779 } else if ("database".equals(table.getSubType())) { 780 tableTypeMap.put(tableFullName, table.getType()); 781 } else if ("schema".equals(table.getSubType())) { 782 tableTypeMap.put(tableFullName, table.getType()); 783 } else if ("stage".equals(table.getSubType())) { 784 tableTypeMap.put(tableFullName, table.getType()); 785 } else if ("datasource".equals(table.getSubType())) { 786 tableTypeMap.put(tableFullName, table.getType()); 787 } else if ("stream".equals(table.getSubType())) { 788 tableTypeMap.put(tableFullName, table.getType()); 789 } else if ("file".equals(table.getSubType())) { 790 tableTypeMap.put(tableFullName, table.getType()); 791 } else if ("table".equals(tableTypeMap.get(tableFullName))) { 792 tableTypeMap.put(tableFullName, table.getType()); 793 } else if ("variable".equals(tableTypeMap.get(tableFullName))) { 794 tableTypeMap.put(tableFullName, table.getType()); 795 } 796 797 if (table.getColumns() != null) { 798 if (!tableColumnMap.containsKey(tableFullName)) { 799 tableColumnMap.put(tableFullName, new LinkedHashSet<String>()); 800 } 801 for (column column : table.getColumns()) { 802 String columnFullName = tableFullName + "." 803 + DlineageUtil.getIdentifierNormalColumnName(column.getName()); 804 805 if (!columnMap.containsKey(columnFullName)) { 806 columnMap.put(columnFullName, new ArrayList<column>()); 807 tableColumnMap.get(tableFullName).add(columnFullName); 808 } 809 810 columnMap.get(columnFullName).add(column); 811 } 812 } 813 } 814 815 Iterator<String> tableNameIter = tableMap.keySet().iterator(); 816 while (tableNameIter.hasNext()) { 817 String tableName = tableNameIter.next(); 818 List<table> tableList = tableMap.get(tableName); 819 table table; 820 if (tableList.size() > 1) { 821 table standardTable = tableList.get(0); 822 //Function允许重名,不做合并处理 823 if (standardTable.isFunction()) { 824 continue; 825 } 826 827 String type = tableTypeMap.get(tableName); 828 table = new table(); 829 table.setId(String.valueOf(++startId)); 830 table.setServer(standardTable.getServer()); 831 table.setDatabase(standardTable.getDatabase()); 832 table.setSchema(standardTable.getSchema()); 833 table.setName(standardTable.getName()); 834 table.setDisplayName(standardTable.getDisplayName()); 835 table.setParent(standardTable.getParent()); 836 table.setColumns(new ArrayList<column>()); 837 Set<String> processIds = new LinkedHashSet<String>(); 838 for (int k = 0; k < tableList.size(); k++) { 839 if (tableList.get(k).getProcessIds() != null) { 840 processIds.addAll(tableList.get(k).getProcessIds()); 841 } 842 } 843 if (!processIds.isEmpty()) { 844 table.setProcessIds(new ArrayList<String>(processIds)); 845 } 846 table.setType(type); 847 for (table item : tableList) { 848 if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) { 849 if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) { 850 table.appendCoordinate(item.getCoordinate()); 851 } 852 } else if (!SQLUtil.isEmpty(item.getCoordinate())) { 853 table.setCoordinate(item.getCoordinate()); 854 } 855 856 if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) { 857 table.setAlias(table.getAlias() + "," + item.getAlias()); 858 } else if (!SQLUtil.isEmpty(item.getAlias())) { 859 table.setAlias(item.getAlias()); 860 } 861 862 tableIdMap.put(item.getId(), table.getId()); 863 864 if (item.isView()) { 865 mergeDataflow.getViews().remove(item); 866 } else if (item.isDatabaseType()) { 867 mergeDataflow.getDatabases().remove(item); 868 } else if (item.isSchemaType()) { 869 mergeDataflow.getSchemas().remove(item); 870 } else if (item.isStage()) { 871 mergeDataflow.getStages().remove(item); 872 } else if (item.isDataSource()) { 873 mergeDataflow.getDatasources().remove(item); 874 } else if (item.isStream()) { 875 mergeDataflow.getStreams().remove(item); 876 } else if (item.isFile()) { 877 mergeDataflow.getPaths().remove(item); 878 } else if (item.isVariable()) { 879 mergeDataflow.getVariables().remove(item); 880 } else if (item.isTable()) { 881 mergeDataflow.getTables().remove(item); 882 } else if (item.isResultSet()) { 883 mergeDataflow.getResultsets().remove(item); 884 } 885 } 886 887 if (table.isView()) { 888 mergeDataflow.getViews().add(table); 889 } else if (table.isDatabaseType()) { 890 mergeDataflow.getDatabases().add(table); 891 } else if (table.isSchemaType()) { 892 mergeDataflow.getSchemas().add(table); 893 } else if (table.isStage()) { 894 mergeDataflow.getStages().add(table); 895 } else if (table.isDataSource()) { 896 mergeDataflow.getDatasources().add(table); 897 } else if (table.isStream()) { 898 mergeDataflow.getStreams().add(table); 899 } else if (table.isFile()) { 900 mergeDataflow.getPaths().add(table); 901 } else if (table.isVariable()) { 902 mergeDataflow.getVariables().add(table); 903 } else if (table.isResultSet()) { 904 mergeDataflow.getResultsets().add(table); 905 } else { 906 mergeDataflow.getTables().add(table); 907 } 908 } else { 909 table = tableList.get(0); 910 } 911 912 Set<String> columns = tableColumnMap.get(tableName); 913 Iterator<String> columnIter = columns.iterator(); 914 List<column> mergeColumns = new ArrayList<column>(); 915 while (columnIter.hasNext()) { 916 String columnName = columnIter.next(); 917 List<column> columnList = columnMap.get(columnName); 918 List<column> functions = new ArrayList<column>(); 919 for (column t : columnList) { 920 if (Boolean.TRUE.toString().equals(t.getIsFunction())) { 921 functions.add(t); 922 } 923 } 924 if (functions != null && !functions.isEmpty()) { 925 for (column function : functions) { 926 mergeColumns.add(function); 927 columnIdMap.put(function.getId(), function.getId()); 928 columnMergeIdMap.put(function.getId(), function); 929 } 930 931 columnList.removeAll(functions); 932 } 933 if (!columnList.isEmpty()) { 934 column firstColumn = columnList.iterator().next(); 935 if (columnList.size() > 1) { 936 column mergeColumn = new column(); 937 mergeColumn.setId(String.valueOf(++startId)); 938 mergeColumn.setName(firstColumn.getName()); 939 mergeColumn.setDisplayName(firstColumn.getDisplayName()); 940 mergeColumn.setSource(firstColumn.getSource()); 941 mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable()); 942 mergeColumns.add(mergeColumn); 943 for (column item : columnList) { 944 mergeColumn.appendCoordinate(item.getCoordinate()); 945 columnIdMap.put(item.getId(), mergeColumn.getId()); 946 //add by grq 2023.02.06 issue=I6DB5S 947 if(item.getDataType() != null){ 948 mergeColumn.setDataType(item.getDataType()); 949 } 950 if(item.isForeignKey() != null){ 951 mergeColumn.setForeignKey(item.isForeignKey()); 952 } 953 if(item.isUnqiueKey() != null){ 954 mergeColumn.setUnqiueKey(item.isUnqiueKey()); 955 } 956 if(item.isIndexKey() != null){ 957 mergeColumn.setIndexKey(item.isIndexKey()); 958 } 959 if(item.isPrimaryKey() != null){ 960 mergeColumn.setPrimaryKey(item.isPrimaryKey()); 961 } 962 //end by grq 963 } 964 columnMergeIdMap.put(mergeColumn.getId(), mergeColumn); 965 } else { 966 mergeColumns.add(firstColumn); 967 columnIdMap.put(firstColumn.getId(), firstColumn.getId()); 968 columnMergeIdMap.put(firstColumn.getId(), firstColumn); 969 } 970 } 971 } 972 table.setColumns(mergeColumns); 973 } 974 975 if (mergeDataflow.getRelationships() != null) { 976 Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>(); 977 for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) { 978 relationship relation = mergeDataflow.getRelationships().get(i); 979 if(RelationshipType.call.name().equals(relation.getType())){ 980 targetColumn target = relation.getCaller(); 981 if (target == null) { 982 continue; 983 } 984 if (target != null && tableIdMap.containsKey(target.getId())) { 985 target.setId(tableIdMap.get(target.getId())); 986 } 987 988 List<sourceColumn> sources = relation.getCallees(); 989 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 990 if (sources != null) { 991 for (sourceColumn source : sources) { 992 if (tableIdMap.containsKey(source.getId())) { 993 source.setId(tableIdMap.get(source.getId())); 994 } 995 } 996 sourceSet.addAll(sources); 997 relation.setCallees(new ArrayList<sourceColumn>(sourceSet)); 998 } 999 1000 String jsonString = JSON.toJSONString(relation, true); 1001 String key = SHA256.getMd5(jsonString); 1002 if (!mergeRelations.containsKey(key)) { 1003 mergeRelations.put(key, relation); 1004 } 1005 } 1006 else { 1007 targetColumn target = relation.getTarget(); 1008 if (target == null) { 1009 continue; 1010 } 1011 if (target != null && tableIdMap.containsKey(target.getParent_id())) { 1012 target.setParent_id(tableIdMap.get(target.getParent_id())); 1013 } 1014 1015 if (columnIdMap.containsKey(target.getId())) { 1016 target.setId(columnIdMap.get(target.getId())); 1017 target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate()); 1018 } 1019 1020 List<sourceColumn> sources = relation.getSources(); 1021 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1022 if (sources != null) { 1023 for (sourceColumn source : sources) { 1024 if (tableIdMap.containsKey(source.getParent_id())) { 1025 source.setParent_id(tableIdMap.get(source.getParent_id())); 1026 } 1027 if (tableIdMap.containsKey(source.getSource_id())) { 1028 source.setSource_id(tableIdMap.get(source.getSource_id())); 1029 } 1030 if (columnIdMap.containsKey(source.getId())) { 1031 source.setId(columnIdMap.get(source.getId())); 1032 source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate()); 1033 } 1034 } 1035 1036 sourceSet.addAll(sources); 1037 relation.setSources(new ArrayList<sourceColumn>(sourceSet)); 1038 } 1039 1040 String jsonString = JSON.toJSONString(relation, true); 1041 String key = SHA256.getMd5(jsonString); 1042 if (!mergeRelations.containsKey(key)) { 1043 mergeRelations.put(key, relation); 1044 } 1045 } 1046 } 1047 1048 mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values())); 1049 } 1050 1051 tableMap.clear(); 1052 tableTypeMap.clear(); 1053 tableIdMap.clear(); 1054 columnMap.clear(); 1055 tableColumnMap.clear(); 1056 columnIdMap.clear(); 1057 columnMergeIdMap.clear(); 1058 tables.clear(); 1059 1060 return mergeDataflow; 1061 } 1062 1063 @Override 1064 public synchronized dataflow getDataFlow() { 1065 if (dataflow != null) { 1066 return dataflow; 1067 } else if (dataflowString != null) { 1068 return XML2Model.loadXML(dataflow.class, dataflowString); 1069 } 1070 return null; 1071 } 1072 1073 @Override 1074 public Map<String, String> getHashSQLMap() { 1075 return hashSQLMap; 1076 } 1077 1078 public static void main(String[] args) throws Exception { 1079// Option option = new Option(); 1080// option.setVendor(EDbVendor.dbvoracle); 1081// option.setOutput(false); 1082// ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\MRADM_PKG_ALL.sql")}, option); 1083// analyzer.generateDataFlow(); 1084// dataflow dataflow = analyzer.getDataFlow(); 1085 dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip")); 1086 //XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip")); 1087 dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow); 1088 dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle); 1089 System.out.println(XML2Model.saveXML(dataflow)); 1090 } 1091}