001package gudusoft.gsqlparser.dlineage.util; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer; 005import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader; 006import gudusoft.gsqlparser.dlineage.dataflow.metadata.sqldep.SQLDepMetadataAnalyzer; 007import gudusoft.gsqlparser.dlineage.dataflow.model.ModelBindingManager; 008import gudusoft.gsqlparser.dlineage.dataflow.model.Option; 009import gudusoft.gsqlparser.dlineage.dataflow.model.RelationshipType; 010import gudusoft.gsqlparser.dlineage.dataflow.model.json.Error; 011import gudusoft.gsqlparser.dlineage.dataflow.model.json.Process; 012import gudusoft.gsqlparser.dlineage.dataflow.model.json.*; 013import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*; 014import gudusoft.gsqlparser.dlineage.metadata.Sqlflow; 015import gudusoft.gsqlparser.util.Logger; 016import gudusoft.gsqlparser.util.LoggerFactory; 017import gudusoft.gsqlparser.util.SQLUtil; 018import gudusoft.gsqlparser.util.json.JSON; 019 020import java.io.File; 021import java.util.*; 022import java.util.concurrent.atomic.AtomicLong; 023import java.util.stream.Collectors; 024 025public class DataflowUtility { 026 027 private static final Logger logger = LoggerFactory.getLogger(FunctionUtility.class); 028 029 public static dataflow mergeFunctionCallDataflow(dataflow dataflow, EDbVendor dbVendor) { 030 if (ModelBindingManager.getGlobalVendor() == null) { 031 ModelBindingManager.setGlobalVendor(dbVendor); 032 } 033 dataflow instance = cloneDataflow(dataflow); 034 List<procedure> procedures = new ArrayList<>(instance.getProcedures()); 035 if (instance.getPackages() != null) { 036 for (oraclePackage pkg : instance.getPackages()) { 037 if (pkg.getProcedures() != null) { 038 for (procedure procedure : pkg.getProcedures()) { 039 procedure.setOraclePackage(pkg); 040 procedures.add(procedure); 041 } 042 } 043 } 044 } 045 046 Map<String, procedure> procedureIdMap = procedures.stream().collect(Collectors.toMap( 047 t -> t.getId(), 048 t -> t, 049 (existingValue, newValue) -> newValue 050 )); 051 Map<String, table> functionIdMap = dataflow.getResultsets().stream().collect(Collectors.toMap( 052 t -> t.getId(), 053 t -> t, 054 (existingValue, newValue) -> newValue 055 )); 056 057 Map<String, table> functionMap = new HashMap<>(); 058 Map<String, procedure> procedureMap = new HashMap<>(); 059 Map<String, oraclePackage> oraclePackageMap = new HashMap<>(); 060 Map<String, Set<String>> oraclePackageProcedureMap = new HashMap<>(); 061 if (instance.getPackages() != null) { 062 for (oraclePackage pkg : instance.getPackages()) { 063 String qualifiedPackageName = DlineageUtil.getIdentifierOraclePackageNameWithArgNum(pkg); 064 if (!oraclePackageMap.containsKey(qualifiedPackageName)) { 065 oraclePackageMap.put(qualifiedPackageName, pkg); 066 } 067 for (procedure procedure : pkg.getProcedures()) { 068 String qualifiedProcedureName = qualifiedPackageName + "." + DlineageUtil.getIdentifierProcedureNameWithArgNum(procedure); 069 if (!oraclePackageProcedureMap.containsKey(qualifiedPackageName)) { 070 oraclePackageProcedureMap.put(qualifiedPackageName, new HashSet<>()); 071 } 072 oraclePackageProcedureMap.get(qualifiedPackageName).add(qualifiedProcedureName); 073 } 074 } 075 } 076 077 078 079 for (procedure procedure : procedures) { 080 String qualifiedProcedureName = DlineageUtil.getIdentifierProcedureNameWithArgNum(procedure); 081 if (procedure.getOraclePackage() != null) { 082 String qualifiedPackageName = DlineageUtil.getIdentifierOraclePackageNameWithArgNum(procedure.getOraclePackage()); 083 qualifiedProcedureName = qualifiedPackageName + "." + DlineageUtil.getIdentifierProcedureNameWithArgNum(procedure); 084 } 085 if (!procedureMap.containsKey(qualifiedProcedureName)) { 086 procedureMap.put(qualifiedProcedureName, procedure); 087 } 088 } 089 090 for (table function : instance.getResultsets()) { 091 String qualifiedFunctionName = DlineageUtil.getIdentifierFunctionName(function); 092 if (!functionMap.containsKey(qualifiedFunctionName)) { 093 functionMap.put(qualifiedFunctionName, function); 094 } 095 } 096 097 Set<oraclePackage> oraclePackages = new LinkedHashSet<>(); 098 for (String key : oraclePackageMap.keySet()) { 099 oraclePackage standardPackage = oraclePackageMap.get(key); 100 oraclePackages.add(standardPackage); 101 Set<String> procedureNames = oraclePackageProcedureMap.get(key); 102 if (procedureNames != null) { 103 for (String procedureName : procedureNames) { 104 procedureName = key + "." + procedureName; 105 if (procedureMap.containsKey(procedureName)) { 106 procedure standardProcedure = procedureMap.get(procedureName); 107 if (!standardPackage.getProcedures().contains(standardProcedure)) { 108 standardPackage.getProcedures().add(standardProcedure); 109 } 110 procedureMap.remove(procedureName); 111 } 112 } 113 } 114 } 115 116 instance.setPackages(new ArrayList<>(oraclePackages)); 117 instance.setProcedures(new ArrayList<>(procedureMap.values())); 118 instance.setResultsets(new ArrayList<>(functionMap.values())); 119 120 Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>(); 121 122 for (relationship relationship : instance.getRelationships()) { 123 if (relationship.getCaller() == null || relationship.getCallees() == null || relationship.getCallees().size() == 0) { 124 continue; 125 } 126 127 String targetId = relationship.getCaller().getId(); 128 if (procedureIdMap.containsKey(targetId)) { 129 procedure procedure = procedureIdMap.get(targetId); 130 String procedureName = DlineageUtil.getIdentifierProcedureNameWithArgNum(procedure); 131 if (procedure.getOraclePackage() != null) { 132 String qualifiedPackageName = DlineageUtil.getIdentifierOraclePackageNameWithArgNum(procedure.getOraclePackage()); 133 procedureName = qualifiedPackageName + "." + procedureName; 134 } 135 procedure standardProcedure = procedureMap.get(procedureName); 136 relationship.getCaller().setId(standardProcedure.getId()); 137 } else if (functionIdMap.containsKey(targetId)) { 138 table function = functionIdMap.get(targetId); 139 String functionName = DlineageUtil.getIdentifierFunctionName(function); 140 table standardFunction = functionMap.get(functionName); 141 relationship.getCaller().setId(standardFunction.getId()); 142 } 143 144 for (sourceColumn sourceColumn : relationship.getCallees()) { 145 String sourceId = sourceColumn.getId(); 146 if (procedureIdMap.containsKey(sourceId)) { 147 procedure procedure = procedureIdMap.get(sourceId); 148 String procedureName = DlineageUtil.getIdentifierProcedureNameWithArgNum(procedure); 149 if (procedure.getOraclePackage() != null) { 150 String qualifiedPackageName = DlineageUtil.getIdentifierOraclePackageNameWithArgNum(procedure.getOraclePackage()); 151 procedureName = qualifiedPackageName + "." + procedureName; 152 } 153 procedure standardProcedure = procedureMap.get(procedureName); 154 sourceColumn.setId(standardProcedure.getId()); 155 } else if (functionIdMap.containsKey(sourceId)) { 156 table function = functionIdMap.get(sourceId); 157 String functionName = DlineageUtil.getIdentifierFunctionName(function); 158 table standardFunction = functionMap.get(functionName); 159 sourceColumn.setId(standardFunction.getId()); 160 } 161 } 162 163 String jsonString = JSON.toJSONString(relationship).replaceAll("\"id\":\".+?\"", ""); 164 String key = SHA256.getMd5(jsonString); 165 if (!mergeRelations.containsKey(key)) { 166 mergeRelations.put(key, relationship); 167 } 168 } 169 170 instance.setRelationships(new ArrayList<relationship>(mergeRelations.values())); 171 172 if (instance.getPackages() != null) { 173 for (oraclePackage pkg : instance.getPackages()) { 174 if (pkg.getProcedures() != null) { 175 for (procedure procedure : pkg.getProcedures()) { 176 procedure.setOraclePackage(null); 177 } 178 } 179 } 180 } 181 182 return instance; 183 } 184 185 public static dataflow convertTableLevelToFunctionCallDataflow(dataflow dataflow, boolean showBuiltIn, EDbVendor dbVendor) { 186 if (ModelBindingManager.getGlobalVendor() == null) { 187 ModelBindingManager.setGlobalVendor(dbVendor); 188 } 189 190 dataflow instance = cloneDataflow(dataflow); 191 192 if (instance.getRelationships() == null) { 193 return instance; 194 } 195 196 List<relationship> callRelationships = instance.getRelationships().stream() 197 .filter(t -> RelationshipType.call.name().equals(t.getType())) 198 .filter(t -> !showBuiltIn ? !Boolean.TRUE.equals(t.getBuiltIn()): true).collect(Collectors.toList()); 199 instance.setRelationships(callRelationships); 200 201 Set<String> ids = new HashSet<>(); 202 203 204 callRelationships.stream().forEach(t -> { 205 ids.add(t.getCaller().getId()); 206 t.getCallees().stream().forEach(t1 -> ids.add(t1.getId())); 207 }); 208 209 Iterator<table> iterator = instance.getTables().iterator(); 210 while (iterator.hasNext()) { 211 table t = iterator.next(); 212 if (!ids.contains(t.getId())) { 213 iterator.remove(); 214 } 215 } 216 217 iterator = instance.getResultsets().iterator(); 218 while (iterator.hasNext()) { 219 table t = iterator.next(); 220 if (!ids.contains(t.getId())) { 221 iterator.remove(); 222 } 223 } 224 225 iterator = instance.getViews().iterator(); 226 while (iterator.hasNext()) { 227 table t = iterator.next(); 228 if (!ids.contains(t.getId())) { 229 iterator.remove(); 230 } 231 } 232 233 iterator = instance.getStages().iterator(); 234 while (iterator.hasNext()) { 235 table t = iterator.next(); 236 if (!ids.contains(t.getId())) { 237 iterator.remove(); 238 } 239 } 240 241 iterator = instance.getStreams().iterator(); 242 while (iterator.hasNext()) { 243 table t = iterator.next(); 244 if (!ids.contains(t.getId())) { 245 iterator.remove(); 246 } 247 } 248 249 iterator = instance.getVariables().iterator(); 250 while (iterator.hasNext()) { 251 table t = iterator.next(); 252 if (!ids.contains(t.getId())) { 253 iterator.remove(); 254 } 255 } 256 257 iterator = instance.getPaths().iterator(); 258 while (iterator.hasNext()) { 259 table t = iterator.next(); 260 if (!ids.contains(t.getId())) { 261 iterator.remove(); 262 } 263 } 264 265 iterator = instance.getDatasources().iterator(); 266 while (iterator.hasNext()) { 267 table t = iterator.next(); 268 if (!ids.contains(t.getId())) { 269 iterator.remove(); 270 } 271 } 272 273 iterator = instance.getDatabases().iterator(); 274 while (iterator.hasNext()) { 275 table t = iterator.next(); 276 if (!ids.contains(t.getId())) { 277 iterator.remove(); 278 } 279 } 280 281 iterator = instance.getSchemas().iterator(); 282 while (iterator.hasNext()) { 283 table t = iterator.next(); 284 if (!ids.contains(t.getId())) { 285 iterator.remove(); 286 } 287 } 288 289 iterator = instance.getSequences().iterator(); 290 while (iterator.hasNext()) { 291 table t = iterator.next(); 292 if (!ids.contains(t.getId())) { 293 iterator.remove(); 294 } 295 } 296 297 return mergeFunctionCallDataflow(instance, dbVendor); 298 } 299 300 public static dataflow convertToTableLevelDataflow(dataflow dataflow) { 301 dataflow instance = cloneDataflow(dataflow); 302 303 if (instance.getRelationships() == null) { 304 return instance; 305 } 306 307 Map<String, LinkedHashSet<Pair<String, String>>> relationMap = new HashMap<String, LinkedHashSet<Pair<String, String>>>(); 308 Map<String, LinkedHashSet<Pair3<String, String, relationship>>> callRelationMap = new HashMap<String, LinkedHashSet<Pair3<String, String, relationship>>>(); 309 for (RelationshipType type : RelationshipType.values()) { 310 relationMap.put(type.name(), new LinkedHashSet<Pair<String, String>>()); 311 callRelationMap.put(type.name(), new LinkedHashSet<Pair3<String, String, relationship>>()); 312 } 313 for (relationship relationship : instance.getRelationships()) { 314 if (RelationshipType.call.name().equals(relationship.getType())) { 315 String targetId = relationship.getCaller().getId(); 316 for (sourceColumn sourceColumn : relationship.getCallees()) { 317 String sourceId = sourceColumn.getId(); 318 callRelationMap.get(relationship.getType()).add(new Pair3<>(targetId, sourceId, relationship)); 319 } 320 } else { 321 String targetId = relationship.getTarget().getParent_id(); 322 for (sourceColumn sourceColumn : relationship.getSources()) { 323 String sourceId = sourceColumn.getParent_id(); 324 relationMap.get(relationship.getType()).add(new Pair<>(targetId, sourceId)); 325 } 326 } 327 } 328 329 long maxId = 0; 330 Map<String, table> dbObjMap = getDataflowDbObjMap(instance); 331 332 List<procedure> procedures = new ArrayList<>(instance.getProcedures()); 333 if (instance.getPackages() != null) { 334 for (oraclePackage pkg : instance.getPackages()) { 335 procedures.addAll(pkg.getProcedures()); 336 } 337 } 338 339 Map<String, procedure> procedureMap = procedures.stream().collect(Collectors.toMap( 340 t -> t.getId(), 341 t -> t, 342 (existingValue, newValue) -> newValue 343 )); 344 345 for (table table : dbObjMap.values()) { 346 if (Long.valueOf(table.getId()) > maxId) { 347 maxId = Long.valueOf(table.getId()); 348 } 349 } 350 351 AtomicLong id = new AtomicLong(maxId + 10000000); 352 353 for (table table : dbObjMap.values()) { 354 column column = new column(); 355 column.setId(String.valueOf(id.incrementAndGet())); 356 column.setName(table.getType()); 357 table.setColumns(Arrays.asList(column)); 358 } 359 360 List<relationship> relations = new ArrayList<relationship>(); 361 for (RelationshipType type : RelationshipType.values()) { 362 LinkedHashSet<Pair<String, String>> relationSet = relationMap.get(type.name()); 363 LinkedHashSet<Pair3<String, String, relationship>> callRelationSet = callRelationMap.get(type.name()); 364 if (RelationshipType.call.equals(type)) { 365 for (Pair3<String, String, relationship> pair : callRelationSet) { 366 if ((dbObjMap.get(pair.first) == null && procedureMap.get(pair.first) == null) 367 || (dbObjMap.get(pair.second) == null && procedureMap.get(pair.second) == null)) { 368 continue; 369 } 370 371 relationship relationship = new relationship(); 372 relationship.setType(type.name()); 373 relationship.setId(String.valueOf(id.incrementAndGet())); 374 relationship.setCallStmt(pair.third.getCallStmt()); 375 relationship.setCallCoordinate(pair.third.getCallCoordinate()); 376 377 targetColumn targetColumn = new targetColumn(); 378 if (dbObjMap.containsKey(pair.first)) { 379 targetColumn.setId(dbObjMap.get(pair.first).getId()); 380 targetColumn.setName(dbObjMap.get(pair.first).getName()); 381 targetColumn.setCoordinate(dbObjMap.get(pair.first).getCoordinate()); 382 } else { 383 targetColumn.setId(procedureMap.get(pair.first).getId()); 384 targetColumn.setName(procedureMap.get(pair.first).getName()); 385 targetColumn.setCoordinate(procedureMap.get(pair.first).getCoordinate()); 386 } 387 388 sourceColumn sourceColumn = new sourceColumn(); 389 if (dbObjMap.containsKey(pair.second)) { 390 sourceColumn.setId(dbObjMap.get(pair.second).getId()); 391 sourceColumn.setName(dbObjMap.get(pair.second).getName()); 392 sourceColumn.setCoordinate(dbObjMap.get(pair.second).getCoordinate()); 393 } else { 394 sourceColumn.setId(procedureMap.get(pair.second).getId()); 395 sourceColumn.setName(procedureMap.get(pair.second).getName()); 396 sourceColumn.setCoordinate(procedureMap.get(pair.second).getCoordinate()); 397 } 398 399 relationship.setCaller(targetColumn); 400 relationship.setCallees(Arrays.asList(sourceColumn)); 401 relationship.setBuiltIn(pair.third.getBuiltIn()); 402 relations.add(relationship); 403 } 404 } else { 405 for (Pair<String, String> pair : relationSet) { 406 if (dbObjMap.get(pair.first) == null || dbObjMap.get(pair.second) == null) { 407 continue; 408 } 409 relationship relationship = new relationship(); 410 relationship.setType(type.name()); 411 relationship.setId(String.valueOf(id.incrementAndGet())); 412 targetColumn targetColumn = new targetColumn(); 413 targetColumn.setId(dbObjMap.get(pair.first).getColumns().get(0).getId()); 414 targetColumn.setColumn(dbObjMap.get(pair.first).getColumns().get(0).getName()); 415 targetColumn.setParent_id(pair.first); 416 targetColumn.setParent_name(dbObjMap.get(pair.first).getName()); 417 sourceColumn sourceColumn = new sourceColumn(); 418 sourceColumn.setId(dbObjMap.get(pair.second).getColumns().get(0).getId()); 419 sourceColumn.setColumn(dbObjMap.get(pair.second).getColumns().get(0).getName()); 420 sourceColumn.setParent_id(pair.second); 421 sourceColumn.setParent_name(dbObjMap.get(pair.second).getName()); 422 relationship.setTarget(targetColumn); 423 relationship.setSources(Arrays.asList(sourceColumn)); 424 relations.add(relationship); 425 } 426 } 427 } 428 instance.setRelationships(relations); 429 return instance; 430 } 431 432 public static dataflow convertToSchemaLevelDataflow(dataflow dataflow, EDbVendor dbVendor) throws Exception { 433 return convertToSchemaLevelDataflow(dataflow, dbVendor, false); 434 } 435 436 public static dataflow convertToSchemaLevelDataflow(dataflow dataflow, EDbVendor dbVendor, boolean isSimple) throws Exception { 437 dataflow instance = cloneDataflow(dataflow); 438 439 if (!isSimple) { 440 DataFlowAnalyzer analyzer = new DataFlowAnalyzer("", dbVendor, true); 441 instance = analyzer.getSimpleDataflow(instance, true, Arrays.asList(new String[]{"fdd", "fdr"})); 442 } 443 444 List<table> allTables = new ArrayList<table>(); 445 allTables.addAll(instance.getTables()); 446 allTables.addAll(instance.getViews()); 447 448 ModelBindingManager.setGlobalVendor(dbVendor); 449 450 Map<String, String> tableIdSchameNameMap = allTables.stream().collect(Collectors.toMap(table -> table.getId(), table -> table.getFullSchemaName(), (existingValue, newValue) -> newValue)); 451 452 if (instance.getRelationships() == null) { 453 return instance; 454 } 455 456 Map<String, LinkedHashSet<Pair<String, String>>> relationMap = new HashMap<String, LinkedHashSet<Pair<String, String>>>(); 457 for (RelationshipType type : RelationshipType.values()) { 458 relationMap.put(type.name(), new LinkedHashSet<Pair<String, String>>()); 459 } 460 for (relationship relationship : instance.getRelationships()) { 461 String targetId = relationship.getTarget().getParent_id(); 462 for (sourceColumn sourceColumn : relationship.getSources()) { 463 String sourceId = sourceColumn.getParent_id(); 464 relationMap.get(relationship.getType()).add(new Pair<>(tableIdSchameNameMap.get(targetId), tableIdSchameNameMap.get(sourceId))); 465 } 466 } 467 468 Map<String, table> dbObjMap = getDataflowDbObjMap(instance); 469 LinkedHashSet<String> schemaNameSet = new LinkedHashSet<String>(); 470 schemaNameSet.addAll(tableIdSchameNameMap.values()); 471 List<table> schemaTables = new ArrayList<table>(); 472 473 AtomicLong id = new AtomicLong(0); 474 475 for (String schemaName : schemaNameSet) { 476 table table = new table(); 477 table.setId(String.valueOf(id.incrementAndGet())); 478 table.setName(schemaName); 479 table.setType("schema"); 480 schemaTables.add(table); 481 dbObjMap.put(schemaName, table); 482 } 483 484 for (table table : schemaTables) { 485 column column = new column(); 486 column.setId(String.valueOf(id.incrementAndGet())); 487 column.setName(table.getType()); 488 table.setColumns(Arrays.asList(column)); 489 } 490 491 List<relationship> relations = new ArrayList<relationship>(); 492 for (RelationshipType type : RelationshipType.values()) { 493 LinkedHashSet<Pair<String, String>> relationSet = relationMap.get(type.name()); 494 495 for (Pair<String, String> pair : relationSet) { 496 if (dbObjMap.get(pair.first) == null || dbObjMap.get(pair.second) == null) { 497 continue; 498 } 499 relationship relationship = new relationship(); 500 relationship.setType(type.name()); 501 relationship.setId(String.valueOf(id.incrementAndGet())); 502 targetColumn targetColumn = new targetColumn(); 503 targetColumn.setId(dbObjMap.get(pair.first).getColumns().get(0).getId()); 504 targetColumn.setColumn(dbObjMap.get(pair.first).getColumns().get(0).getName()); 505 targetColumn.setParent_id(dbObjMap.get(pair.first).getId()); 506 targetColumn.setParent_name(dbObjMap.get(pair.first).getName()); 507 sourceColumn sourceColumn = new sourceColumn(); 508 sourceColumn.setId(dbObjMap.get(pair.second).getColumns().get(0).getId()); 509 sourceColumn.setColumn(dbObjMap.get(pair.second).getColumns().get(0).getName()); 510 sourceColumn.setParent_id(dbObjMap.get(pair.second).getId()); 511 sourceColumn.setParent_name(dbObjMap.get(pair.second).getName()); 512 relationship.setTarget(targetColumn); 513 relationship.setSources(Arrays.asList(sourceColumn)); 514 relations.add(relationship); 515 } 516 } 517 518 dataflow schemaDataflow = new dataflow(); 519 schemaDataflow.setTables(schemaTables); 520 schemaDataflow.setRelationships(relations); 521 return schemaDataflow; 522 } 523 524 public static dataflow cloneDataflow(dataflow dataflow) { 525 dataflow dataflowCopy = new dataflow(); 526 if (dataflow.getTables() != null) { 527 dataflowCopy.setTables(cloneTables(dataflow.getTables())); 528 } 529 if (dataflow.getViews() != null) { 530 dataflowCopy.setViews(cloneTables(dataflow.getViews())); 531 } 532 if (dataflow.getRelationships() != null) { 533 dataflowCopy.setRelationships(new ArrayList<>(dataflow.getRelationships())); 534 } 535 if (dataflow.getErrors() != null) { 536 dataflowCopy.setErrors(new ArrayList<>(dataflow.getErrors())); 537 } 538 if (dataflow.getPaths() != null) { 539 dataflowCopy.setPaths(cloneTables(dataflow.getPaths())); 540 } 541 if (dataflow.getPackages() != null) { 542 dataflowCopy.setPackages(new ArrayList<>(dataflow.getPackages())); 543 } 544 if (dataflow.getProcedures() != null) { 545 dataflowCopy.setProcedures(new ArrayList<>(dataflow.getProcedures())); 546 } 547 if (dataflow.getProcesses() != null) { 548 dataflowCopy.setProcesses(new ArrayList<>(dataflow.getProcesses())); 549 } 550 if (dataflow.getResultsets() != null) { 551 dataflowCopy.setResultsets(cloneTables(dataflow.getResultsets())); 552 } 553 if (dataflow.getVariables() != null) { 554 dataflowCopy.setVariables(cloneTables(dataflow.getVariables())); 555 } 556 if (dataflow.getStages() != null) { 557 dataflowCopy.setStages(cloneTables(dataflow.getStages())); 558 } 559 if (dataflow.getSequences() != null) { 560 dataflowCopy.setSequences(cloneTables(dataflow.getSequences())); 561 } 562 if (dataflow.getDatasources() != null) { 563 dataflowCopy.setDatasources(cloneTables(dataflow.getDatasources())); 564 } 565 if (dataflow.getDatabases() != null) { 566 dataflowCopy.setDatabases(cloneTables(dataflow.getDatabases())); 567 } 568 if (dataflow.getSchemas() != null) { 569 dataflowCopy.setSchemas(cloneTables(dataflow.getSchemas())); 570 } 571 if (dataflow.getStreams() != null) { 572 dataflowCopy.setStreams(cloneTables(dataflow.getStreams())); 573 } 574 dataflowCopy.setOrientation(dataflow.getOrientation()); 575 return dataflowCopy; 576 } 577 578 private static List<table> cloneTables(List<table> tableList) { 579 if (tableList == null) { 580 return null; 581 } 582 List<table> tables = new ArrayList<>(tableList.size()); 583 for (table item : tableList) { 584 try { 585 table table = (table) item.clone(); 586 if (item.getColumns() != null) { 587 table.setColumns(new ArrayList<column>(item.getColumns())); 588 } 589 tables.add(table); 590 } catch (CloneNotSupportedException e) { 591 logger.error("Clone table failed.", e); 592 } 593 } 594 return tables; 595 } 596 597 public static Map<String, table> getDataflowDbObjMap(dataflow dataflow) { 598 List<table> tables = new ArrayList<table>(); 599 if (dataflow.getTables() != null) { 600 tables.addAll(dataflow.getTables()); 601 } 602 if (dataflow.getViews() != null) { 603 tables.addAll(dataflow.getViews()); 604 } 605 if (dataflow.getPaths() != null) { 606 tables.addAll(dataflow.getPaths()); 607 } 608 if (dataflow.getResultsets() != null) { 609 tables.addAll(dataflow.getResultsets()); 610 } 611 if (dataflow.getVariables() != null) { 612 tables.addAll(dataflow.getVariables()); 613 } 614 if (dataflow.getStages() != null) { 615 tables.addAll(dataflow.getStages()); 616 } 617 if (dataflow.getSequences() != null) { 618 tables.addAll(dataflow.getSequences()); 619 } 620 if (dataflow.getDatasources() != null) { 621 tables.addAll(dataflow.getDatasources()); 622 } 623 if (dataflow.getDatabases() != null) { 624 tables.addAll(dataflow.getDatabases()); 625 } 626 if (dataflow.getSchemas() != null) { 627 tables.addAll(dataflow.getSchemas()); 628 } 629 if (dataflow.getStreams() != null) { 630 tables.addAll(dataflow.getStreams()); 631 } 632 633 Map<String, table> dbObjMap = new HashMap<>(); 634 for (table table : tables) { 635 dbObjMap.put(table.getId(), table); 636 } 637 return dbObjMap; 638 } 639 public static Map<String, table> getDataflowDbObjNameMap(dataflow dataflow) { 640 List<table> tables = new ArrayList<table>(); 641 if (dataflow.getTables() != null) { 642 tables.addAll(dataflow.getTables()); 643 } 644 if (dataflow.getViews() != null) { 645 tables.addAll(dataflow.getViews()); 646 } 647 if (dataflow.getPaths() != null) { 648 tables.addAll(dataflow.getPaths()); 649 } 650 if (dataflow.getResultsets() != null) { 651 tables.addAll(dataflow.getResultsets()); 652 } 653 if (dataflow.getVariables() != null) { 654 tables.addAll(dataflow.getVariables()); 655 } 656 if (dataflow.getStages() != null) { 657 tables.addAll(dataflow.getStages()); 658 } 659 if (dataflow.getSequences() != null) { 660 tables.addAll(dataflow.getSequences()); 661 } 662 if (dataflow.getDatasources() != null) { 663 tables.addAll(dataflow.getDatasources()); 664 } 665 if (dataflow.getDatabases() != null) { 666 tables.addAll(dataflow.getDatabases()); 667 } 668 if (dataflow.getSchemas() != null) { 669 tables.addAll(dataflow.getSchemas()); 670 } 671 if (dataflow.getStreams() != null) { 672 tables.addAll(dataflow.getStreams()); 673 } 674 675 Map<String, table> dbObjMap = new HashMap<>(); 676 for(table table: tables){ 677 dbObjMap.put(table.getFullName(), table); 678 } 679 return dbObjMap; 680 } 681 public static dataflow mergeDataflowsWithDifferentStartId(Collection<dataflow> dataflows, EDbVendor vendor) { 682 ModelBindingManager.setGlobalVendor(vendor); 683 try { 684 return mergeDataflowsByStartId(dataflows, DATAFLOW_ID_RANGE * dataflows.size()); 685 } finally { 686 ModelBindingManager.removeGlobalVendor(); 687 } 688 } 689 690 /** 691 * 内部合并实现:调用方需保证所有输入 dataflow 的 ID 空间不冲突,并且已提前设置好 ModelBindingManager。 692 * <p> 693 * 该方法代替之前位于 {@code ParallelDataFlowAnalyzer} 的同名实现,外部调用者应使用 694 * {@link #mergeDataflowsWithDifferentStartId(Collection, EDbVendor)} 或 695 * {@link #mergeDataflows(Collection, EDbVendor)}。 696 */ 697 public static dataflow mergeDataflowsByStartId(Collection<dataflow> dataflows, long startId) { 698 dataflow mergeDataflow = new dataflow(); 699 700 List<table> tableCopy = new ArrayList<table>(); 701 List<table> viewCopy = new ArrayList<table>(); 702 List<table> databaseCopy = new ArrayList<table>(); 703 List<table> schemaCopy = new ArrayList<table>(); 704 List<table> stageCopy = new ArrayList<table>(); 705 List<table> dataSourceCopy = new ArrayList<table>(); 706 List<table> streamCopy = new ArrayList<table>(); 707 List<table> fileCopy = new ArrayList<table>(); 708 List<table> variableCopy = new ArrayList<table>(); 709 List<table> resultSetCopy = new ArrayList<table>(); 710 List<table> sequenceCopy = new ArrayList<table>(); 711 712 List<process> processCopy = new ArrayList<process>(); 713 List<relationship> relationshipCopy = new ArrayList<relationship>(); 714 List<procedure> procedureCopy = new ArrayList<procedure>(); 715 List<oraclePackage> packageCopy = new ArrayList<oraclePackage>(); 716 List<error> errorCopy = new ArrayList<error>(); 717 718 List<table> tables = new ArrayList<table>(); 719 for (dataflow dataflow : dataflows) { 720 if (dataflow == null) { 721 continue; 722 } 723 724 if (dataflow.getTables() != null) { 725 tableCopy.addAll(dataflow.getTables()); 726 } 727 mergeDataflow.setTables(tableCopy); 728 if (dataflow.getViews() != null) { 729 viewCopy.addAll(dataflow.getViews()); 730 } 731 mergeDataflow.setViews(viewCopy); 732 if (dataflow.getDatabases() != null) { 733 databaseCopy.addAll(dataflow.getDatabases()); 734 } 735 mergeDataflow.setDatabases(databaseCopy); 736 if (dataflow.getSchemas() != null) { 737 schemaCopy.addAll(dataflow.getSchemas()); 738 } 739 mergeDataflow.setSchemas(schemaCopy); 740 if (dataflow.getStages() != null) { 741 stageCopy.addAll(dataflow.getStages()); 742 } 743 mergeDataflow.setStages(stageCopy); 744 if (dataflow.getDatasources() != null) { 745 dataSourceCopy.addAll(dataflow.getDatasources()); 746 } 747 mergeDataflow.setDatasources(dataSourceCopy); 748 if (dataflow.getStreams() != null) { 749 streamCopy.addAll(dataflow.getStreams()); 750 } 751 mergeDataflow.setStreams(streamCopy); 752 if (dataflow.getPaths() != null) { 753 fileCopy.addAll(dataflow.getPaths()); 754 } 755 mergeDataflow.setPaths(fileCopy); 756 if (dataflow.getVariables() != null) { 757 variableCopy.addAll(dataflow.getVariables()); 758 } 759 mergeDataflow.setVariables(variableCopy); 760 if (dataflow.getResultsets() != null) { 761 resultSetCopy.addAll(dataflow.getResultsets()); 762 } 763 mergeDataflow.setResultsets(resultSetCopy); 764 if (dataflow.getSequences() != null) { 765 sequenceCopy.addAll(dataflow.getSequences()); 766 } 767 mergeDataflow.setSequences(sequenceCopy); 768 if (dataflow.getProcesses() != null) { 769 processCopy.addAll(dataflow.getProcesses()); 770 } 771 mergeDataflow.setProcesses(processCopy); 772 if (dataflow.getRelationships() != null) { 773 relationshipCopy.addAll(dataflow.getRelationships()); 774 } 775 mergeDataflow.setRelationships(relationshipCopy); 776 if (dataflow.getProcedures() != null) { 777 procedureCopy.addAll(dataflow.getProcedures()); 778 } 779 mergeDataflow.setProcedures(procedureCopy); 780 if (dataflow.getPackages() != null) { 781 packageCopy.addAll(dataflow.getPackages()); 782 } 783 mergeDataflow.setPackages(packageCopy); 784 if (dataflow.getErrors() != null) { 785 errorCopy.addAll(dataflow.getErrors()); 786 } 787 if (errorCopy.size() > 10000) { 788 errorCopy = errorCopy.subList(0, 10000); 789 } 790 mergeDataflow.setErrors(errorCopy); 791 792 tables.addAll(dataflow.getTables()); 793 tables.addAll(dataflow.getViews()); 794 tables.addAll(dataflow.getDatabases()); 795 tables.addAll(dataflow.getSchemas()); 796 tables.addAll(dataflow.getStages()); 797 tables.addAll(dataflow.getDatasources()); 798 tables.addAll(dataflow.getStreams()); 799 tables.addAll(dataflow.getPaths()); 800 tables.addAll(dataflow.getResultsets()); 801 tables.addAll(dataflow.getVariables()); 802 if (dataflow.getSequences() != null) { 803 tables.addAll(dataflow.getSequences()); 804 } 805 } 806 807 Map<String, List<table>> tableMap = new HashMap<String, List<table>>(); 808 Map<String, String> tableTypeMap = new HashMap<String, String>(); 809 Map<String, String> tableIdMap = new HashMap<String, String>(); 810 811 Map<String, List<column>> columnMap = new HashMap<String, List<column>>(); 812 Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>(); 813 Map<String, String> columnIdMap = new HashMap<String, String>(); 814 Map<String, column> columnMergeIdMap = new HashMap<String, column>(); 815 816 List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures()); 817 if (mergeDataflow.getPackages() != null) { 818 for (oraclePackage pkg : mergeDataflow.getPackages()) { 819 procedures.addAll(pkg.getProcedures()); 820 } 821 } 822 823 Set<String> procedureIdSet = procedures.stream().map(t -> t.getId()).collect(Collectors.toSet()); 824 825 for (table table : tables) { 826 String qualifiedTableName = DlineageUtil.getQualifiedTableName(table); 827 String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName); 828 if ("variable".endsWith(table.getType()) && !SQLUtil.isEmpty(table.getParent())) { 829 tableFullName = table.getParent() + "." + tableFullName; 830 } 831 832 if (!tableMap.containsKey(tableFullName)) { 833 tableMap.put(tableFullName, new ArrayList<table>()); 834 } 835 836 tableMap.get(tableFullName).add(table); 837 838 if (!tableTypeMap.containsKey(tableFullName)) { 839 tableTypeMap.put(tableFullName, table.getType()); 840 } else if ("view".equals(table.getSubType())) { 841 tableTypeMap.put(tableFullName, table.getType()); 842 } else if ("database".equals(table.getSubType())) { 843 tableTypeMap.put(tableFullName, table.getType()); 844 } else if ("schema".equals(table.getSubType())) { 845 tableTypeMap.put(tableFullName, table.getType()); 846 } else if ("stage".equals(table.getSubType())) { 847 tableTypeMap.put(tableFullName, table.getType()); 848 } else if ("datasource".equals(table.getSubType())) { 849 tableTypeMap.put(tableFullName, table.getType()); 850 } else if ("stream".equals(table.getSubType())) { 851 tableTypeMap.put(tableFullName, table.getType()); 852 } else if ("file".equals(table.getSubType())) { 853 tableTypeMap.put(tableFullName, table.getType()); 854 } else if ("sequence".equals(table.getSubType())) { 855 tableTypeMap.put(tableFullName, table.getType()); 856 } else if ("table".equals(tableTypeMap.get(tableFullName))) { 857 tableTypeMap.put(tableFullName, table.getType()); 858 } else if ("variable".equals(tableTypeMap.get(tableFullName))) { 859 tableTypeMap.put(tableFullName, table.getType()); 860 } 861 862 if (table.getColumns() != null) { 863 if (!tableColumnMap.containsKey(tableFullName)) { 864 tableColumnMap.put(tableFullName, new LinkedHashSet<String>()); 865 } 866 for (column column : table.getColumns()) { 867 String columnFullName = tableFullName + "." 868 + DlineageUtil.getIdentifierNormalColumnName(column.getName()); 869 870 if (!columnMap.containsKey(columnFullName)) { 871 columnMap.put(columnFullName, new ArrayList<column>()); 872 tableColumnMap.get(tableFullName).add(columnFullName); 873 } 874 875 columnMap.get(columnFullName).add(column); 876 } 877 } 878 } 879 880 Iterator<String> tableNameIter = tableMap.keySet().iterator(); 881 while (tableNameIter.hasNext()) { 882 String tableName = tableNameIter.next(); 883 List<table> tableList = tableMap.get(tableName); 884 table table; 885 if (tableList.size() > 1) { 886 table standardTable = tableList.get(0); 887 //Function允许重名,不做合并处理 888 if (standardTable.isFunction()) { 889 continue; 890 } 891 892 String type = tableTypeMap.get(tableName); 893 table = new table(); 894 table.setId(String.valueOf(++startId)); 895 table.setServer(standardTable.getServer()); 896 table.setDatabase(standardTable.getDatabase()); 897 table.setSchema(standardTable.getSchema()); 898 table.setName(standardTable.getName()); 899 table.setDisplayName(standardTable.getDisplayName()); 900 table.setParent(standardTable.getParent()); 901 table.setColumns(new ArrayList<column>()); 902 Set<String> processIds = new LinkedHashSet<String>(); 903 for (int k = 0; k < tableList.size(); k++) { 904 if (tableList.get(k).getProcessIds() != null) { 905 processIds.addAll(tableList.get(k).getProcessIds()); 906 } 907 } 908 if (!processIds.isEmpty()) { 909 table.setProcessIds(new ArrayList<String>(processIds)); 910 } 911 table.setType(type); 912 for (table item : tableList) { 913 if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) { 914 if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) { 915 table.appendCoordinate(item.getCoordinate()); 916 } 917 } else if (!SQLUtil.isEmpty(item.getCoordinate())) { 918 table.setCoordinate(item.getCoordinate()); 919 } 920 921 if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) { 922 table.setAlias(table.getAlias() + "," + item.getAlias()); 923 } else if (!SQLUtil.isEmpty(item.getAlias())) { 924 table.setAlias(item.getAlias()); 925 } 926 927 tableIdMap.put(item.getId(), table.getId()); 928 929 if (item.isView()) { 930 mergeDataflow.getViews().remove(item); 931 } else if (item.isDatabaseType()) { 932 mergeDataflow.getDatabases().remove(item); 933 } else if (item.isSchemaType()) { 934 mergeDataflow.getSchemas().remove(item); 935 } else if (item.isStage()) { 936 mergeDataflow.getStages().remove(item); 937 } else if (item.isDataSource()) { 938 mergeDataflow.getDatasources().remove(item); 939 } else if (item.isStream()) { 940 mergeDataflow.getStreams().remove(item); 941 } else if (item.isFile()) { 942 mergeDataflow.getPaths().remove(item); 943 } else if (item.isVariable()) { 944 mergeDataflow.getVariables().remove(item); 945 } else if (item.isTable()) { 946 mergeDataflow.getTables().remove(item); 947 } else if (item.isResultSet()) { 948 mergeDataflow.getResultsets().remove(item); 949 } else if (item.isSequence()) { 950 mergeDataflow.getSequences().remove(item); 951 } 952 } 953 954 if (table.isView()) { 955 mergeDataflow.getViews().add(table); 956 } else if (table.isDatabaseType()) { 957 mergeDataflow.getDatabases().add(table); 958 } else if (table.isSchemaType()) { 959 mergeDataflow.getSchemas().add(table); 960 } else if (table.isStage()) { 961 mergeDataflow.getStages().add(table); 962 } else if (table.isDataSource()) { 963 mergeDataflow.getDatasources().add(table); 964 } else if (table.isStream()) { 965 mergeDataflow.getStreams().add(table); 966 } else if (table.isFile()) { 967 mergeDataflow.getPaths().add(table); 968 } else if (table.isVariable()) { 969 mergeDataflow.getVariables().add(table); 970 } else if (table.isResultSet()) { 971 mergeDataflow.getResultsets().add(table); 972 } else if (table.isSequence()) { 973 mergeDataflow.getSequences().add(table); 974 } else { 975 mergeDataflow.getTables().add(table); 976 } 977 } else { 978 table = tableList.get(0); 979 } 980 981 Set<String> columns = tableColumnMap.get(tableName); 982 Iterator<String> columnIter = columns.iterator(); 983 List<column> mergeColumns = new ArrayList<column>(); 984 while (columnIter.hasNext()) { 985 String columnName = columnIter.next(); 986 List<column> columnList = columnMap.get(columnName); 987 List<column> functions = new ArrayList<column>(); 988 for (column t : columnList) { 989 if (Boolean.TRUE.toString().equals(t.getIsFunction())) { 990 functions.add(t); 991 } 992 } 993 if (functions != null && !functions.isEmpty()) { 994 for (column function : functions) { 995 mergeColumns.add(function); 996 columnIdMap.put(function.getId(), function.getId()); 997 columnMergeIdMap.put(function.getId(), function); 998 } 999 1000 columnList.removeAll(functions); 1001 } 1002 if (!columnList.isEmpty()) { 1003 column firstColumn = columnList.iterator().next(); 1004 if (columnList.size() > 1) { 1005 column mergeColumn = new column(); 1006 mergeColumn.setId(String.valueOf(++startId)); 1007 mergeColumn.setName(firstColumn.getName()); 1008 mergeColumn.setDisplayName(firstColumn.getDisplayName()); 1009 mergeColumn.setSource(firstColumn.getSource()); 1010 mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable()); 1011 mergeColumns.add(mergeColumn); 1012 for (column item : columnList) { 1013 mergeColumn.appendCoordinate(item.getCoordinate()); 1014 columnIdMap.put(item.getId(), mergeColumn.getId()); 1015 //add by grq 2023.02.06 issue=I6DB5S 1016 if (item.getDataType() != null) { 1017 mergeColumn.setDataType(item.getDataType()); 1018 } 1019 if (item.isForeignKey() != null) { 1020 mergeColumn.setForeignKey(item.isForeignKey()); 1021 } 1022 if (item.isUnqiueKey() != null) { 1023 mergeColumn.setUnqiueKey(item.isUnqiueKey()); 1024 } 1025 if (item.isIndexKey() != null) { 1026 mergeColumn.setIndexKey(item.isIndexKey()); 1027 } 1028 if (item.isPrimaryKey() != null) { 1029 mergeColumn.setPrimaryKey(item.isPrimaryKey()); 1030 } 1031 //end by grq 1032 } 1033 columnMergeIdMap.put(mergeColumn.getId(), mergeColumn); 1034 } else { 1035 mergeColumns.add(firstColumn); 1036 columnIdMap.put(firstColumn.getId(), firstColumn.getId()); 1037 columnMergeIdMap.put(firstColumn.getId(), firstColumn); 1038 } 1039 } 1040 } 1041 table.setColumns(mergeColumns); 1042 } 1043 1044 if (mergeDataflow.getRelationships() != null) { 1045 Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>(); 1046 for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) { 1047 relationship relation = mergeDataflow.getRelationships().get(i); 1048 if (RelationshipType.call.name().equals(relation.getType())) { 1049 targetColumn target = relation.getCaller(); 1050 if (target == null) { 1051 continue; 1052 } 1053 if (target != null && tableIdMap.containsKey(target.getId())) { 1054 target.setId(tableIdMap.get(target.getId())); 1055 } 1056 1057 List<sourceColumn> sources = relation.getCallees(); 1058 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1059 if (sources != null) { 1060 for (sourceColumn source : sources) { 1061 if (tableIdMap.containsKey(source.getId())) { 1062 source.setId(tableIdMap.get(source.getId())); 1063 } 1064 } 1065 sourceSet.addAll(sources); 1066 relation.setCallees(new ArrayList<sourceColumn>(sourceSet)); 1067 } 1068 1069 String jsonString = JSON.toJSONString(relation, true); 1070 String key = SHA256.getMd5(jsonString); 1071 if (!mergeRelations.containsKey(key)) { 1072 mergeRelations.put(key, relation); 1073 } 1074 } else { 1075 targetColumn target = relation.getTarget(); 1076 if (target == null) { 1077 continue; 1078 } 1079 if (target != null && tableIdMap.containsKey(target.getParent_id())) { 1080 target.setParent_id(tableIdMap.get(target.getParent_id())); 1081 } 1082 1083 if (columnIdMap.containsKey(target.getId())) { 1084 target.setId(columnIdMap.get(target.getId())); 1085 target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate()); 1086 } 1087 1088 List<sourceColumn> sources = relation.getSources(); 1089 Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>(); 1090 if (sources != null) { 1091 for (sourceColumn source : sources) { 1092 if (tableIdMap.containsKey(source.getParent_id())) { 1093 source.setParent_id(tableIdMap.get(source.getParent_id())); 1094 } 1095 if (tableIdMap.containsKey(source.getSource_id())) { 1096 source.setSource_id(tableIdMap.get(source.getSource_id())); 1097 } 1098 if (columnIdMap.containsKey(source.getId())) { 1099 source.setId(columnIdMap.get(source.getId())); 1100 source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate()); 1101 } 1102 } 1103 1104 sourceSet.addAll(sources); 1105 relation.setSources(new ArrayList<sourceColumn>(sourceSet)); 1106 } 1107 1108 String jsonString = JSON.toJSONString(relation, true); 1109 String key = SHA256.getMd5(jsonString); 1110 if (!mergeRelations.containsKey(key)) { 1111 mergeRelations.put(key, relation); 1112 } 1113 } 1114 } 1115 1116 mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values())); 1117 } 1118 1119 tableMap.clear(); 1120 tableTypeMap.clear(); 1121 tableIdMap.clear(); 1122 columnMap.clear(); 1123 tableColumnMap.clear(); 1124 columnIdMap.clear(); 1125 columnMergeIdMap.clear(); 1126 tables.clear(); 1127 1128 return mergeDataflow; 1129 } 1130 1131 /** 1132 * 单个 dataflow 的 ID 跨度(与 ParallelDataFlowAnalyzer 中的常量保持一致)。 1133 */ 1134 private static final long DATAFLOW_ID_RANGE = 5000000L; 1135 1136 /** 1137 * 合并一组 startId 都为 0(即 ID 空间相互冲突)的 dataflow。 1138 * <p> 1139 * 标准合并 {@link #mergeDataflowsWithDifferentStartId(Collection, EDbVendor)} 的前提是 1140 * 每个 dataflow 在生成时已通过 {@code optionCopy.setStartId(5000000L * i)} 占用了 1141 * 不重叠的 ID 段;当外部调用方拿到的 dataflow 都是用默认 startId=0 生成时,直接合并 1142 * 会因为 ID 冲突而错乱。本方法会先按下标把每个 dataflow 的全部 ID 偏移 1143 * {@link #DATAFLOW_ID_RANGE} * index,再委托给标准合并逻辑。 1144 * <p> 1145 * 注意:此方法会原地修改入参 dataflow 的 ID(与标准合并行为一致)。 1146 */ 1147 public static dataflow mergeDataflows(Collection<dataflow> dataflows, EDbVendor vendor) { 1148 if (dataflows == null || dataflows.isEmpty()) { 1149 return null; 1150 } 1151 ModelBindingManager.setGlobalVendor(vendor); 1152 boolean optionSet = false; 1153 if (ModelBindingManager.getGlobalOption() == null) { 1154 Option option = new Option(); 1155 option.setVendor(vendor); 1156 ModelBindingManager.setGlobalOption(option); 1157 optionSet = true; 1158 } 1159 try { 1160 // ID 按下标 * DATAFLOW_ID_RANGE 偏移,避免 ID 冲突; 1161 // coordinate 第三维 fileIdx 按前面所有 dataflow 累积的源文件数偏移, 1162 // 使得合并后每个来源的 fileIdx 保持连续且不重叠。 1163 long cumulativeFileIdx = 0L; 1164 int i = 0; 1165 for (dataflow df : dataflows) { 1166 if (df != null) { 1167 int maxFileIdx = findMaxFileIdx(df); 1168 offsetDataflowIds(df, DATAFLOW_ID_RANGE * i, cumulativeFileIdx); 1169 if (maxFileIdx >= 0) { 1170 cumulativeFileIdx += (maxFileIdx + 1L); 1171 } 1172 } 1173 i++; 1174 } 1175 return mergeDataflowsWithDifferentStartId(dataflows, vendor); 1176 } finally { 1177 ModelBindingManager.removeGlobalVendor(); 1178 if (optionSet) { 1179 ModelBindingManager.removeGlobalOption(); 1180 } 1181 } 1182 } 1183 1184 /** 1185 * 从一组临时 XML 文件迭代加载并合并 dataflow,节约内存。 1186 * <p> 1187 * 与 {@link #iterativeMergeDataflowsFromFilesByStartId(List, long)} 不同的是,本方法假定文件中的 1188 * dataflow 是用默认 startId=0 生成的(ID 空间互相冲突),因此会按下标先做 ID 偏移再合并。 1189 * <p> 1190 * 注意:本方法按入参 {@code tempFiles} 顺序迭代合并,不做任何排序。 1191 * coordinate 第三维 fileIdx 的偏移也是严格按入参顺序累积的, 1192 * 调用方传入的顺序应与原始源文件的语义顺序保持一致。 1193 */ 1194 public static dataflow iterativeMergeDataflowsFromFiles(List<File> tempFiles, EDbVendor vendor) { 1195 if (tempFiles == null || tempFiles.isEmpty()) { 1196 return null; 1197 } 1198 ModelBindingManager.setGlobalVendor(vendor); 1199 boolean optionSet = false; 1200 if (ModelBindingManager.getGlobalOption() == null) { 1201 Option option = new Option(); 1202 option.setVendor(vendor); 1203 ModelBindingManager.setGlobalOption(option); 1204 optionSet = true; 1205 } 1206 try { 1207 return iterativeMergeFromFiles(tempFiles, DATAFLOW_ID_RANGE * tempFiles.size(), true); 1208 } finally { 1209 ModelBindingManager.removeGlobalVendor(); 1210 if (optionSet) { 1211 ModelBindingManager.removeGlobalOption(); 1212 } 1213 } 1214 } 1215 1216 /** 1217 * 从一组临时 XML 文件迭代加载并合并 dataflow,调用方需保证文件中的 dataflow ID 空间互不冲突, 1218 * 且已设置好 ModelBindingManager。 1219 * <p> 1220 * 主要供 {@code ParallelDataFlowAnalyzer} 等内部生成方使用:每个临时文件中的 dataflow 已通过 1221 * {@code optionCopy.setStartId(DATAFLOW_ID_RANGE * i)} 占用了不重叠的 ID 段。 1222 * <p> 1223 * 注意:本方法按入参 {@code tempFiles} 顺序迭代合并,不做任何排序。 1224 * coordinate 第三维 fileIdx 的偏移严格按入参顺序累积。 1225 */ 1226 public static dataflow iterativeMergeDataflowsFromFilesByStartId(List<File> tempFiles, long startId) { 1227 return iterativeMergeFromFiles(tempFiles, startId, false); 1228 } 1229 1230 private static dataflow iterativeMergeFromFiles(List<File> tempFiles, long startId, boolean offsetIds) { 1231 if (tempFiles == null || tempFiles.isEmpty()) { 1232 logger.warn("iterativeMergeDataflowsFromFiles: tempFiles is null or empty"); 1233 return null; 1234 } 1235 1236 // 注意:此处不再按文件大小排序,保持入参顺序。 1237 // 因为 coordinate 第三维 fileIdx 表示原始源文件下标,偏移必须与入参顺序一致, 1238 // 否则合并后 fileIdx 与原始源文件对不上。 1239 logger.info("iterativeMergeDataflowsFromFiles: total dataflows: " + tempFiles.size()); 1240 1241 long startTime = System.currentTimeMillis(); 1242 int totalIterations = tempFiles.size() - 1; 1243 logger.info("iterativeMergeDataflowsFromFiles: start merging, total iterations: " + totalIterations); 1244 1245 long loadStartTime = System.currentTimeMillis(); 1246 dataflow mergedDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(0)); 1247 long cumulativeFileIdx = 0L; 1248 if (mergedDataflow != null) { 1249 // 即便 ID 不需偏移(byStartId 路径),coordinate 第三维 fileIdx 仍要偏移, 1250 // 避免合并后不同来源的 fileIdx 撞在一起。 1251 long idOffset = offsetIds ? 0L : 0L; 1252 offsetDataflowIds(mergedDataflow, idOffset, 0L); 1253 int maxFileIdx = findMaxFileIdx(mergedDataflow); 1254 if (maxFileIdx >= 0) { 1255 cumulativeFileIdx = maxFileIdx + 1L; 1256 } 1257 } 1258 long loadEndTime = System.currentTimeMillis(); 1259 int initialRelationCount = mergedDataflow != null && mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 1260 logger.info("iterativeMergeDataflowsFromFiles: loaded initial dataflow, relation count: " + initialRelationCount + ", time: " + (loadEndTime - loadStartTime) + "ms"); 1261 1262 for (int i = 1; i < tempFiles.size(); i++) { 1263 long iterationStartTime = System.currentTimeMillis(); 1264 int currentIteration = i; 1265 int remainingIterations = totalIterations - currentIteration + 1; 1266 1267 logger.info("iterativeMergeDataflowsFromFiles: iteration " + currentIteration + "/" + totalIterations + ", remaining: " + remainingIterations); 1268 1269 long loadCurrentStartTime = System.currentTimeMillis(); 1270 dataflow currentDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(i)); 1271 if (currentDataflow != null) { 1272 int maxFileIdx = findMaxFileIdx(currentDataflow); 1273 long idOffset = offsetIds ? (DATAFLOW_ID_RANGE * i) : 0L; 1274 offsetDataflowIds(currentDataflow, idOffset, cumulativeFileIdx); 1275 if (maxFileIdx >= 0) { 1276 cumulativeFileIdx += (maxFileIdx + 1L); 1277 } 1278 } 1279 long loadCurrentEndTime = System.currentTimeMillis(); 1280 int currentRelationCount = currentDataflow != null && currentDataflow.getRelationships() != null ? currentDataflow.getRelationships().size() : 0; 1281 logger.info("iterativeMergeDataflowsFromFiles: loaded dataflow[" + i + "], relation count: " + currentRelationCount + ", time: " + (loadCurrentEndTime - loadCurrentStartTime) + "ms"); 1282 1283 List<dataflow> tempList = new ArrayList<>(2); 1284 tempList.add(mergedDataflow); 1285 tempList.add(currentDataflow); 1286 1287 long mergeStartTime = System.currentTimeMillis(); 1288 mergedDataflow = mergeDataflowsByStartId(tempList, startId + DATAFLOW_ID_RANGE * i); 1289 long mergeEndTime = System.currentTimeMillis(); 1290 int mergedRelationCount = mergedDataflow != null && mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 1291 1292 long iterationEndTime = System.currentTimeMillis(); 1293 long iterationTime = iterationEndTime - iterationStartTime; 1294 long mergeTime = mergeEndTime - mergeStartTime; 1295 logger.info("iterativeMergeDataflowsFromFiles: iteration " + currentIteration + " completed, merged relation count: " + mergedRelationCount + ", merge time: " + mergeTime + "ms, total iteration time: " + iterationTime + "ms"); 1296 1297 currentDataflow = null; 1298 tempList.clear(); 1299 } 1300 1301 long endTime = System.currentTimeMillis(); 1302 long totalTime = endTime - startTime; 1303 int finalRelationCount = mergedDataflow != null && mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0; 1304 logger.info("iterativeMergeDataflowsFromFiles: all iterations completed, final relation count: " + finalRelationCount + ", total time: " + totalTime + "ms (" + (totalTime / 1000.0) + "s)"); 1305 1306 return mergedDataflow; 1307 } 1308 1309 private static String offsetId(String id, long offset) { 1310 if (id == null || id.isEmpty()) { 1311 return id; 1312 } 1313 try { 1314 return String.valueOf(Long.parseLong(id) + offset); 1315 } catch (NumberFormatException ignore) { 1316 return id; 1317 } 1318 } 1319 1320 /** 1321 * 偏移 coordinate 第三维(fileIdx)。 1322 * <p> 1323 * coordinate 字符串格式为 {@code [line,col,fileIdx],[line,col,fileIdx]}, 1324 * 合并多个独立生成的 dataflow 时,它们的 fileIdx 都从 0 起始会互相冲突, 1325 * 通过给第三维加上 offset 来区分不同的来源。 1326 */ 1327 private static String offsetCoordinate(String coordinate, long offset) { 1328 if (coordinate == null || coordinate.isEmpty() || offset == 0L) { 1329 return coordinate; 1330 } 1331 StringBuilder sb = new StringBuilder(coordinate.length() + 8); 1332 int i = 0; 1333 int len = coordinate.length(); 1334 while (i < len) { 1335 int open = coordinate.indexOf('[', i); 1336 if (open < 0) { 1337 sb.append(coordinate, i, len); 1338 break; 1339 } 1340 int close = coordinate.indexOf(']', open); 1341 if (close < 0) { 1342 sb.append(coordinate, i, len); 1343 break; 1344 } 1345 sb.append(coordinate, i, open + 1); 1346 String inner = coordinate.substring(open + 1, close); 1347 String[] parts = inner.split(",", -1); 1348 for (int k = 0; k < parts.length; k++) { 1349 if (k > 0) sb.append(','); 1350 if (k == 2) { 1351 String p = parts[k].trim(); 1352 try { 1353 sb.append(Long.parseLong(p) + offset); 1354 } catch (NumberFormatException ignore) { 1355 sb.append(parts[k]); 1356 } 1357 } else { 1358 sb.append(parts[k]); 1359 } 1360 } 1361 sb.append(']'); 1362 i = close + 1; 1363 } 1364 return sb.toString(); 1365 } 1366 1367 /** 1368 * 偏移 dataflow 中所有节点的 ID 和 coordinate 第三维 fileIdx。 1369 * 1370 * @param df 待偏移的 dataflow(原地修改) 1371 * @param idOffset 给所有 id / xxxId 字段加上的偏移量 1372 * @param fileIdxOffset 给所有 coordinate 第三维 (fileIdx) 加上的偏移量, 1373 * 通常为合并时前面所有 dataflow 累积的源文件数, 1374 * 以便合并后不同来源的 fileIdx 保持连续且不重叠。 1375 */ 1376 private static void offsetDataflowIds(dataflow df, long idOffset, long fileIdxOffset) { 1377 if (idOffset == 0L && fileIdxOffset == 0L) { 1378 return; 1379 } 1380 1381 // tables / views / databases / schemas / stages / datasources / streams / paths / variables / resultsets / sequences 1382 for (table t : dataflow.getAllTables(df)) { 1383 offsetTableIds(t, idOffset); 1384 t.setCoordinate(offsetCoordinate(t.getCoordinate(), fileIdxOffset)); 1385 if (t.getColumns() != null) { 1386 for (column c : t.getColumns()) { 1387 c.setCoordinate(offsetCoordinate(c.getCoordinate(), fileIdxOffset)); 1388 } 1389 } 1390 } 1391 1392 // processes 1393 if (df.getProcesses() != null) { 1394 for (process p : df.getProcesses()) { 1395 p.setId(offsetId(p.getId(), idOffset)); 1396 p.setProcedureId(offsetId(p.getProcedureId(), idOffset)); 1397 p.setCoordinate(offsetCoordinate(p.getCoordinate(), fileIdxOffset)); 1398 } 1399 } 1400 1401 // procedures 1402 if (df.getProcedures() != null) { 1403 for (procedure p : df.getProcedures()) { 1404 p.setId(offsetId(p.getId(), idOffset)); 1405 p.setCoordinate(offsetCoordinate(p.getCoordinate(), fileIdxOffset)); 1406 if (p.getArguments() != null) { 1407 for (argument arg : p.getArguments()) { 1408 arg.setId(offsetId(arg.getId(), idOffset)); 1409 arg.setCoordinate(offsetCoordinate(arg.getCoordinate(), fileIdxOffset)); 1410 } 1411 } 1412 } 1413 } 1414 1415 // oracle packages 1416 if (df.getPackages() != null) { 1417 for (oraclePackage pkg : df.getPackages()) { 1418 pkg.setId(offsetId(pkg.getId(), idOffset)); 1419 if (pkg.getProcedures() != null) { 1420 for (procedure pp : pkg.getProcedures()) { 1421 pp.setId(offsetId(pp.getId(), idOffset)); 1422 pp.setCoordinate(offsetCoordinate(pp.getCoordinate(), fileIdxOffset)); 1423 if (pp.getArguments() != null) { 1424 for (argument arg : pp.getArguments()) { 1425 arg.setId(offsetId(arg.getId(), idOffset)); 1426 arg.setCoordinate(offsetCoordinate(arg.getCoordinate(), fileIdxOffset)); 1427 } 1428 } 1429 } 1430 } 1431 } 1432 } 1433 1434 // relationships 1435 if (df.getRelationships() != null) { 1436 for (relationship rel : df.getRelationships()) { 1437 rel.setId(offsetId(rel.getId(), idOffset)); 1438 rel.setProcessId(offsetId(rel.getProcessId(), idOffset)); 1439 rel.setProcedureId(offsetId(rel.getProcedureId(), idOffset)); 1440 if (rel.getTarget() != null) { 1441 offsetTargetColumnIds(rel.getTarget(), idOffset); 1442 } 1443 if (rel.getCaller() != null) { 1444 offsetTargetColumnIds(rel.getCaller(), idOffset); 1445 } 1446 if (rel.getSources() != null) { 1447 for (sourceColumn sc : rel.getSources()) { 1448 offsetSourceColumnIds(sc, idOffset); 1449 } 1450 } 1451 if (rel.getCallees() != null) { 1452 for (sourceColumn sc : rel.getCallees()) { 1453 offsetSourceColumnIds(sc, idOffset); 1454 } 1455 } 1456 } 1457 } 1458 } 1459 1460 /** 1461 * 扫描 dataflow 中所有 coordinate,返回最大的第三维 (fileIdx) 值; 1462 * 若没有任何 coordinate 则返回 -1。 1463 */ 1464 private static int findMaxFileIdx(dataflow df) { 1465 if (df == null) { 1466 return -1; 1467 } 1468 int max = -1; 1469 for (table t : dataflow.getAllTables(df)) { 1470 max = Math.max(max, maxFileIdxInCoordinate(t.getCoordinate())); 1471 if (t.getColumns() != null) { 1472 for (column c : t.getColumns()) { 1473 max = Math.max(max, maxFileIdxInCoordinate(c.getCoordinate())); 1474 } 1475 } 1476 } 1477 if (df.getProcesses() != null) { 1478 for (process p : df.getProcesses()) { 1479 max = Math.max(max, maxFileIdxInCoordinate(p.getCoordinate())); 1480 } 1481 } 1482 if (df.getProcedures() != null) { 1483 for (procedure p : df.getProcedures()) { 1484 max = Math.max(max, maxFileIdxInCoordinate(p.getCoordinate())); 1485 if (p.getArguments() != null) { 1486 for (argument arg : p.getArguments()) { 1487 max = Math.max(max, maxFileIdxInCoordinate(arg.getCoordinate())); 1488 } 1489 } 1490 } 1491 } 1492 if (df.getPackages() != null) { 1493 for (oraclePackage pkg : df.getPackages()) { 1494 if (pkg.getProcedures() != null) { 1495 for (procedure pp : pkg.getProcedures()) { 1496 max = Math.max(max, maxFileIdxInCoordinate(pp.getCoordinate())); 1497 if (pp.getArguments() != null) { 1498 for (argument arg : pp.getArguments()) { 1499 max = Math.max(max, maxFileIdxInCoordinate(arg.getCoordinate())); 1500 } 1501 } 1502 } 1503 } 1504 } 1505 } 1506 return max; 1507 } 1508 1509 /** 1510 * 从 coordinate 字符串中解析出最大的第三维 (fileIdx) 值; 1511 * 若 coordinate 为空或没有任何可解析的数值则返回 -1。 1512 */ 1513 private static int maxFileIdxInCoordinate(String coordinate) { 1514 if (coordinate == null || coordinate.isEmpty()) { 1515 return -1; 1516 } 1517 int max = -1; 1518 int i = 0; 1519 int len = coordinate.length(); 1520 while (i < len) { 1521 int open = coordinate.indexOf('[', i); 1522 if (open < 0) break; 1523 int close = coordinate.indexOf(']', open); 1524 if (close < 0) break; 1525 String inner = coordinate.substring(open + 1, close); 1526 String[] parts = inner.split(",", -1); 1527 if (parts.length >= 3) { 1528 try { 1529 int v = Integer.parseInt(parts[2].trim()); 1530 if (v > max) max = v; 1531 } catch (NumberFormatException ignore) { 1532 // 不是数字,跳过 1533 } 1534 } 1535 i = close + 1; 1536 } 1537 return max; 1538 } 1539 1540 private static void offsetTableIds(table t, long offset) { 1541 if (t == null) { 1542 return; 1543 } 1544 t.setId(offsetId(t.getId(), offset)); 1545 if (t.getProcessIds() != null) { 1546 List<String> newProcessIds = new ArrayList<>(t.getProcessIds().size()); 1547 for (String pid : t.getProcessIds()) { 1548 newProcessIds.add(offsetId(pid, offset)); 1549 } 1550 t.setProcessIds(newProcessIds); 1551 } 1552 if (t.getColumns() != null) { 1553 for (column c : t.getColumns()) { 1554 c.setId(offsetId(c.getId(), offset)); 1555 } 1556 } 1557 } 1558 1559 private static void offsetTargetColumnIds(targetColumn col, long offset) { 1560 col.setId(offsetId(col.getId(), offset)); 1561 col.setParent_id(offsetId(col.getParent_id(), offset)); 1562 col.setTarget_id(offsetId(col.getTarget_id(), offset)); 1563 } 1564 1565 private static void offsetSourceColumnIds(sourceColumn col, long offset) { 1566 col.setId(offsetId(col.getId(), offset)); 1567 col.setParent_id(offsetId(col.getParent_id(), offset)); 1568 col.setSource_id(offsetId(col.getSource_id(), offset)); 1569 } 1570 1571 public static dataflow readDataflowFromCsvMetadata(String csvMetadata, EDbVendor vendor) { 1572 if (!MetadataReader.isMetadata(csvMetadata)) { 1573 throw new IllegalArgumentException("Illegal csv metadata."); 1574 } 1575 return new SQLDepMetadataAnalyzer().analyzeMetadata(vendor, csvMetadata); 1576 } 1577 1578 public static Dataflow mergeDataflowsAndCsv(List<Pair<EDbVendor,dataflow>> pairs, String csvMetadata) { 1579 //重置ID防止重复 1580 Long index = 0L; 1581 for(int i=0; i<pairs.size(); i++){ 1582 dataflow dataflow = pairs.get(i).second; 1583 Map<String, table> objIDMap = getDataflowDbObjMap(dataflow); 1584 Map<String, String> idMaps = new HashMap<>(); 1585 for (Map.Entry<String, table> entry : objIDMap.entrySet()) { 1586 index = index + 1; 1587 String id = index.toString(); 1588 idMaps.put(entry.getKey(), id); 1589 table table = entry.getValue(); 1590 table.setId(id); 1591 if(table.getColumns() != null){ 1592 for(column col: table.getColumns()){ 1593 index = index + 1; 1594 id = index.toString(); 1595 idMaps.put(col.getId(), id); 1596 col.setId(id); 1597 } 1598 } 1599 } 1600 if(dataflow.getProcedures() != null){ 1601 for(procedure procedure: dataflow.getProcedures()){ 1602 index = index + 1; 1603 String id = index.toString(); 1604 idMaps.put(procedure.getId(), id); 1605 procedure.setId(id); 1606 } 1607 } 1608 if(dataflow.getProcesses() != null){ 1609 for(process process: dataflow.getProcesses()){ 1610 index = index + 1; 1611 String id = index.toString(); 1612 idMaps.put(process.getId(), id); 1613 process.setId(id); 1614 } 1615 } 1616 if(dataflow.getPackages() != null){ 1617 for(oraclePackage oraclePackage: dataflow.getPackages()){ 1618 index = index + 1; 1619 String id = index.toString(); 1620 idMaps.put(oraclePackage.getId(), id); 1621 oraclePackage.setId(id); 1622 if(oraclePackage.getProcedures() != null){ 1623 for(procedure procedure: oraclePackage.getProcedures()){ 1624 index = index + 1; 1625 id = index.toString(); 1626 idMaps.put(procedure.getId(), id); 1627 procedure.setId(id); 1628 } 1629 } 1630 } 1631 } 1632 if(dataflow.getRelationships() != null && dataflow.getRelationships().size()>0){ 1633 for(relationship rel: dataflow.getRelationships()){ 1634 index = index + 1; 1635 String id = index.toString(); 1636 rel.setId(id); 1637 rel.setProcedureId(idMaps.get(rel.getProcedureId())); 1638 rel.setProcessId(idMaps.get(rel.getProcessId())); 1639 rel.getTarget().setParent_id(idMaps.get(rel.getTarget().getParent_id())); 1640 rel.getTarget().setTarget_id(idMaps.get(rel.getTarget().getTarget_id())); 1641 rel.getTarget().setId(idMaps.get(rel.getTarget().getId())); 1642 for(sourceColumn column: rel.getSources()){ 1643 column.setParent_id(idMaps.get(column.getParent_id())); 1644 column.setId(idMaps.get(column.getId())); 1645 } 1646 } 1647 } 1648 } 1649 Dataflow mDataflow = DataFlowAnalyzer.getSqlflowJSONModel(pairs.get(0).first, pairs.get(0).second, false); 1650 Map<String, table> objNameMap = getDataflowDbObjNameMap(pairs.get(0).second); 1651 Sqlflow dbobjs = mDataflow.getDbobjs(); 1652 List<Relationship> mRelationshipList = new ArrayList<>(); 1653 if(mDataflow.getRelationships() != null && mDataflow.getRelationships().length>0){ 1654 mRelationshipList = new LinkedList<>(Arrays.asList(mDataflow.getRelationships())); 1655 } 1656 1657 List<Error> mErrorList = new ArrayList<>(); 1658 if(mDataflow.getErrors() != null && mDataflow.getErrors().length>0){ 1659 mErrorList = new LinkedList<>(Arrays.asList(mDataflow.getErrors())); 1660 } 1661 1662 List<Process> mProcessList = new ArrayList<>(); 1663 if(mDataflow.getProcesses() != null && mDataflow.getProcesses().length>0){ 1664 mProcessList = new LinkedList<>(Arrays.asList(mDataflow.getProcesses())); 1665 } 1666 1667 for(int i=1; i<pairs.size(); i++){ 1668 objNameMap.putAll(getDataflowDbObjNameMap(pairs.get(i).second)); 1669 Dataflow dataflow = DataFlowAnalyzer.getSqlflowJSONModel(pairs.get(i).first, pairs.get(i).second, false); 1670 if(dataflow.getDbobjs().getServers() != null && dataflow.getDbobjs().getServers().size()>0){ 1671 if(dbobjs.getServers() == null){ 1672 dbobjs.setServers(new ArrayList<>()); 1673 } 1674 dbobjs.getServers().addAll(dataflow.getDbobjs().getServers()); 1675 } 1676 1677 if(dataflow.getDbobjs().getErrorMessages() != null && dataflow.getDbobjs().getErrorMessages().size()>0){ 1678 if(dbobjs.getErrorMessages() == null){ 1679 dbobjs.setErrorMessages(new ArrayList<>()); 1680 } 1681 dbobjs.getErrorMessages().addAll(dataflow.getDbobjs().getErrorMessages()); 1682 } 1683 1684 if(dataflow.getRelationships() != null && dataflow.getRelationships().length>0){ 1685 List<Relationship> relationshipList = new LinkedList<>(Arrays.asList(dataflow.getRelationships())); 1686 mRelationshipList.addAll(relationshipList); 1687 } 1688 1689 if(dataflow.getErrors() != null && dataflow.getErrors().length>0){ 1690 List<Error> errorList = new LinkedList<>(Arrays.asList(dataflow.getErrors())); 1691 mErrorList.addAll(errorList); 1692 } 1693 1694 if(dataflow.getProcesses() != null && dataflow.getProcesses().length>0){ 1695 List<Process> processList = new LinkedList<>(Arrays.asList(dataflow.getProcesses())); 1696 mProcessList.addAll(processList); 1697 } 1698 1699 } 1700 if(!SQLUtil.isEmpty(csvMetadata)){ 1701 /** 1702 * excel内的血缘 只合并关系 就是只做关联,如果找不到obj 就算了 1703 */ 1704 dataflow df = readDataflowFromCsvMetadata(csvMetadata); 1705 Map<String, table> objIDMap = getDataflowDbObjMap(df); 1706 if(df.getRelationships() != null && df.getRelationships().size()>0){ 1707 for(relationship rel: df.getRelationships()){ 1708 rel.setId("m"+rel.getId()); 1709 table sTable = objIDMap.get(rel.getTarget().getParent_id()); 1710 table tTable = objNameMap.get(sTable.getFullName()); 1711 rel.getTarget().setParent_id(tTable.getId()); 1712 for(column col: tTable.getColumns()){ 1713 if(col.getName().equalsIgnoreCase(rel.getTarget().getColumn())){ 1714 rel.getTarget().setId(col.getId()); 1715 break; 1716 } 1717 } 1718 for(sourceColumn column: rel.getSources()){ 1719 sTable = objIDMap.get(column.getParent_id()); 1720 tTable = objNameMap.get(sTable.getFullName()); 1721 column.setParent_id(tTable.getId()); 1722 for(column col: tTable.getColumns()){ 1723 if(col.getName().equalsIgnoreCase(column.getColumn())){ 1724 column.setId(col.getId()); 1725 break; 1726 } 1727 } 1728 } 1729 mRelationshipList.add(toRelationship(rel)); 1730 } 1731 } 1732 } 1733 1734 mDataflow.setDbobjs(dbobjs); 1735 mDataflow.setRelationships(mRelationshipList.toArray(new Relationship[mRelationshipList.size()])); 1736 mDataflow.setProcesses(mProcessList.toArray(new Process[mProcessList.size()])); 1737 mDataflow.setErrors(mErrorList.toArray(new Error[mErrorList.size()])); 1738 return mDataflow; 1739 } 1740 1741 public static dataflow readDataflowFromCsvMetadata(String csvMetadata) { 1742 if (!MetadataReader.isMetadata(csvMetadata)) { 1743 throw new IllegalArgumentException("Illegal csv metadata."); 1744 } 1745 return new SQLDepMetadataAnalyzer().analyzeMetadata(null, csvMetadata); 1746 } 1747 1748 private static Relationship toRelationship(relationship relation){ 1749 Relationship relationModel; 1750 if (relation.getType().equals("join")) { 1751 JoinRelationship joinRelationModel = new JoinRelationship(); 1752 joinRelationModel.setCondition(relation.getCondition()); 1753 joinRelationModel.setJoinType(relation.getJoinType()); 1754 joinRelationModel.setClause(relation.getClause()); 1755 relationModel = joinRelationModel; 1756 } else { 1757 relationModel = new Relationship(); 1758 } 1759 relationModel.setId(relation.getId()); 1760 relationModel.setProcessId(relation.getProcessId()); 1761 relationModel.setProcessType(relation.getProcessType()); 1762 relationModel.setType(relation.getType()); 1763 relationModel.setEffectType(relation.getEffectType()); 1764 relationModel.setPartition(relation.getPartition()); 1765 relationModel.setFunction(relation.getFunction()); 1766 relationModel.setProcedureId(relation.getProcedureId()); 1767 relationModel.setSqlHash(relation.getSqlHash()); 1768 relationModel.setCondition(relation.getCondition()); 1769 relationModel.setSqlComment(relation.getSqlComment()); 1770 relationModel.setTimestampMax(relation.getTimestampMax()); 1771 relationModel.setTimestampMin(relation.getTimestampMin()); 1772 1773 if (relation.getTarget() != null && relation.getSources() != null && !relation.getSources().isEmpty()) { 1774 RelationshipElement targetModel = new RelationshipElement(); 1775 targetColumn target = relation.getTarget(); 1776 targetModel.setColumn(target.getColumn()); 1777 targetModel.setParentName(target.getParent_name()); 1778 targetModel.setTargetName(target.getTarget_name()); 1779 targetModel.setId(target.getId()); 1780 targetModel.setTargetId(target.getTarget_id()); 1781 targetModel.setParentId(target.getParent_id()); 1782 targetModel.setCoordinates(Coordinate.parse(target.getCoordinate())); 1783 targetModel.setFunction(target.getFunction()); 1784 targetModel.setType(target.getType()); 1785 relationModel.setTarget(targetModel); 1786 1787 List<RelationshipElement> sourceModels = new ArrayList<>(); 1788 for (sourceColumn source : relation.getSources()) { 1789 RelationshipElement sourceModel = new RelationshipElement(); 1790 sourceModel.setColumn(source.getColumn()); 1791 sourceModel.setParentName(source.getParent_name()); 1792 sourceModel.setSourceName(source.getSource_name()); 1793 sourceModel.setColumnType(source.getColumn_type()); 1794 sourceModel.setId(source.getId()); 1795 sourceModel.setParentId(source.getParent_id()); 1796 sourceModel.setSourceId(source.getSource_id()); 1797 sourceModel.setCoordinates(Coordinate.parse(source.getCoordinate())); 1798 sourceModel.setClauseType(source.getClauseType()); 1799 sourceModel.setType(source.getType()); 1800 sourceModels.add(sourceModel); 1801 if (source.getTransforms() != null && !source.getTransforms().isEmpty()) { 1802 List<Transform> transforms = new ArrayList<gudusoft.gsqlparser.dlineage.dataflow.model.json.Transform>(); 1803 for (transform transform : source.getTransforms()) { 1804 Transform item = new Transform(); 1805 item.setCode(transform.getCode()); 1806 item.setType(transform.getType()); 1807 item.setCoordinate(transform.getCoordinate(true)); 1808 transforms.add(item); 1809 } 1810 sourceModel.setTransforms(transforms.toArray(new Transform[0])); 1811 } 1812 } 1813 relationModel.setSources(sourceModels.toArray(new RelationshipElement[0])); 1814 } else if (relation.getCaller() != null && relation.getCallees() != null 1815 && !relation.getCallees().isEmpty()) { 1816 RelationshipElement targetModel = new RelationshipElement(); 1817 targetColumn target = relation.getCaller(); 1818 targetModel.setName(target.getName()); 1819 targetModel.setId(target.getId()); 1820 targetModel.setCoordinates(Coordinate.parse(target.getCoordinate())); 1821 targetModel.setType(target.getType()); 1822 relationModel.setCaller(targetModel); 1823 List<RelationshipElement> sourceModels = new ArrayList<gudusoft.gsqlparser.dlineage.dataflow.model.json.RelationshipElement>(); 1824 for (sourceColumn source : relation.getCallees()) { 1825 RelationshipElement sourceModel = new RelationshipElement(); 1826 sourceModel.setName(source.getName()); 1827 sourceModel.setId(source.getId()); 1828 sourceModel.setCoordinates(Coordinate.parse(source.getCoordinate())); 1829 sourceModel.setType(source.getType()); 1830 sourceModels.add(sourceModel); 1831 } 1832 relationModel.setCallees(sourceModels.toArray(new RelationshipElement[0])); 1833 } 1834 return relationModel; 1835 } 1836 1837// public static void main(String[] args) throws Exception { 1838// // 基于文件的迭代合并(节约内存,适合大数据量) 1839// dataflow dataflow = iterativeMergeDataflowsFromFiles(Arrays.asList(new File("C:\\Users\\KK\\xwechat_files\\wxid_z9ci6s8b7g0d21_9365\\msg\\file\\2026-06\\dataflow_item").listFiles()), EDbVendor.dbvoracle); 1840// System.out.println(XML2Model.saveXML(dataflow)); 1841// } 1842// 1843// public static void main(String[] args) throws Exception { 1844// // 基于内存的合并(一次性加载所有文件到内存,适合小数据量) 1845// File[] files = new File("C:\\Users\\KK\\xwechat_files\\wxid_z9ci6s8b7g0d21_9365\\msg\\file\\2026-06\\dataflow_item").listFiles(); 1846// if (files == null || files.length == 0) { 1847// System.out.println("No dataflow files found."); 1848// return; 1849// } 1850// 1851// List<dataflow> dataflows = new ArrayList<>(); 1852// for (File file : files) { 1853// dataflow df = XML2Model.loadXML(dataflow.class, file); 1854// if (df != null) { 1855// dataflows.add(df); 1856// } 1857// } 1858// 1859// dataflow mergedDataflow = mergeDataflows(dataflows, EDbVendor.dbvoracle); 1860// System.out.println(XML2Model.saveXML(mergedDataflow)); 1861// } 1862}