001package gudusoft.gsqlparser.dlineage.util; 002 003import java.io.File; 004import java.util.ArrayList; 005import java.util.Arrays; 006import java.util.Comparator; 007import java.util.HashMap; 008import java.util.Iterator; 009import java.util.LinkedHashMap; 010import java.util.LinkedHashSet; 011import java.util.List; 012import java.util.Map; 013import java.util.Set; 014import java.util.TreeSet; 015 016import gudusoft.gsqlparser.EDbVendor; 017import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer; 018import gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow; 019import gudusoft.gsqlparser.dlineage.dataflow.model.xml.process; 020import gudusoft.gsqlparser.dlineage.dataflow.model.xml.relationship; 021import gudusoft.gsqlparser.dlineage.dataflow.model.xml.sourceColumn; 022import gudusoft.gsqlparser.dlineage.dataflow.model.xml.table; 023import gudusoft.gsqlparser.dlineage.dataflow.model.xml.targetColumn; 024import gudusoft.gsqlparser.util.Logger; 025import gudusoft.gsqlparser.util.LoggerFactory; 026import gudusoft.gsqlparser.util.SQLUtil; 027 028public class ProcessUtility { 029 030 private static final Logger logger = LoggerFactory.getLogger(ProcessUtility.class); 031 032 public static dataflow generateTableLevelLineage(DataFlowAnalyzer analyzer, dataflow instance) { 033 return generateTableLevelLineage(analyzer, instance, true, false); 034 } 035 036 public static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance) { 037 return generateTableLevelLineageCsv(analyzer, instance, true, false, ","); 038 } 039 040 public static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, String delimiter) { 041 return generateTableLevelLineageCsv(analyzer, instance, true, false, delimiter); 042 } 043 044 private static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, boolean ignoreSelfLineage, boolean isSimple, String delimiter) { 045 StringBuilder buffer = new StringBuilder(); 046 buffer.append("source_db"+delimiter+"source_schema"+delimiter+"source_table"+delimiter+"source_column"+delimiter+"target_db"+delimiter+"target_schema"+delimiter+"target_table"+delimiter+"target_column"+delimiter+"process_type"+delimiter+"process_hashid"+delimiter+"coordinate"+delimiter+"procedure_names\n"); 047 try { 048 dataflow simple = instance; 049 if (!isSimple) { 050 simple = analyzer.getSimpleDataflow(instance, true); 051 } 052 if (simple.getTables() == null) { 053 return null; 054 } 055 List<relationship> relations = instance.getRelationships(); 056 if (relations == null || relations.size() == 0) { 057 return null; 058 } 059 long maxId = Long.parseLong(relations.get(relations.size() - 1).getId().split("\\-")[0].replace("_", "")) 060 * 100; 061 062 Iterator<relationship> iter = simple.getRelationships().iterator(); 063 while (iter.hasNext()) { 064 relationship relation = iter.next(); 065 if (!"fdd".equals(relation.getType())) { 066 iter.remove(); 067 } 068 } 069 070 Map<Pair3<String, String, String>, Set<relationship>> tableRelationMap = new LinkedHashMap<Pair3<String, String, String>, Set<relationship>>(); 071 List<relationship> tableRelations = new ArrayList<relationship>(); 072 iter = simple.getRelationships().iterator(); 073 while (iter.hasNext()) { 074 relationship relation = iter.next(); 075 if (relation.getSources() != null && relation.getTarget() != null && relation.getProcessId() != null) { 076 String targetId = relation.getTarget().getParent_id(); 077 if (SQLUtil.isEmpty(targetId)) 078 continue; 079 for (sourceColumn sourceColumn : relation.getSources()) { 080 String sourceId = sourceColumn.getParent_id(); 081 if (SQLUtil.isEmpty(sourceId)) 082 continue; 083 084 if (ignoreSelfLineage && sourceId.equals(targetId)) { 085 continue; 086 } 087 088 Pair3<String, String, String> tableRelationPair = new Pair3<String, String, String>(sourceId, 089 targetId, relation.getProcessId()); 090 if (!tableRelationMap.containsKey(tableRelationPair)) { 091 relationship tableRelation = new relationship(); 092 093 targetColumn targetTable = new targetColumn(); 094 targetTable.setId(String.valueOf(++maxId)); 095 targetTable.setTarget_id(targetId); 096 targetTable.setTarget_name(relation.getTarget().getParent_name()); 097 tableRelation.setTarget(targetTable); 098 099 sourceColumn sourceTale = new sourceColumn(); 100 sourceTale.setId(String.valueOf(++maxId)); 101 sourceTale.setSource_id(sourceId); 102 sourceTale.setSource_name(sourceColumn.getParent_name()); 103 104 tableRelation.setSources(Arrays.asList(sourceTale)); 105 tableRelation.setType("fdd"); 106 tableRelation.setId(String.valueOf(++maxId)); 107 tableRelation.setProcessId(relation.getProcessId()); 108 tableRelations.add(tableRelation); 109 tableRelationMap.put(tableRelationPair, new LinkedHashSet<relationship>()); 110 } 111 tableRelationMap.get(tableRelationPair).add(relation); 112 } 113 } 114 } 115 116 List<table> tables = new ArrayList<>(); 117 if (simple.getTables() != null) { 118 tables.addAll(simple.getTables()); 119 } 120 if (simple.getPaths() != null) { 121 tables.addAll(simple.getPaths()); 122 } 123 if (simple.getStages() != null) { 124 tables.addAll(simple.getStages()); 125 } 126 if (simple.getDatasources() != null) { 127 tables.addAll(simple.getDatasources()); 128 } 129 if (simple.getStreams() != null) { 130 tables.addAll(simple.getStreams()); 131 } 132 if (simple.getDatabases() != null) { 133 tables.addAll(simple.getDatabases()); 134 } 135 if (simple.getSchemas() != null) { 136 tables.addAll(simple.getSchemas()); 137 } 138 if (simple.getVariables() != null) { 139 tables.addAll(simple.getVariables()); 140 } 141 if (simple.getViews() != null) { 142 tables.addAll(simple.getViews()); 143 } 144 Map<String, table> tableMap = new HashMap<String, table>(); 145 Iterator<table> tableIter = tables.iterator(); 146 while (tableIter.hasNext()) { 147 table table = tableIter.next(); 148 tableMap.put(table.getId(), table); 149 } 150 151 Map<String, process> processMap = new HashMap<String, process>(); 152 Iterator<process> processIter = simple.getProcesses().iterator(); 153 while (processIter.hasNext()) { 154 process process = processIter.next(); 155 processMap.put(process.getId(), process); 156 } 157 158 Set<String> lineSet = new TreeSet<String>(new Comparator<String>() { 159 @Override 160 public int compare(String o1, String o2) { 161 return o1.toLowerCase().compareTo(o2.toLowerCase()); 162 } 163 }); 164 for (Pair3<String, String, String> key : tableRelationMap.keySet()) { 165 String sourceId = key.first; 166 String targetId = key.second; 167 String processId = key.third; 168 table sourceTable = tableMap.get(sourceId); 169 table targetTable = tableMap.get(targetId); 170 process process = processMap.get(processId); 171 172 String source_db = "default"; 173 if(!SQLUtil.isEmpty(sourceTable.getDatabase())){ 174 source_db = sourceTable.getDatabase(); 175 } 176 String source_schema = "default"; 177 if(!SQLUtil.isEmpty(sourceTable.getSchema())){ 178 source_schema = sourceTable.getSchema(); 179 } 180 String source_table = sourceTable.getTableNameOnly(); 181 Set<relationship> processRelations = tableRelationMap.get(key); 182 Set<String> sourceColumns = new LinkedHashSet<String>(); 183 Set<String> targetColumns = new LinkedHashSet<String>(); 184 for (relationship relation : processRelations) { 185 for (sourceColumn column : relation.getSources()) { 186 sourceColumns.add(column.getColumn()); 187 } 188 targetColumns.add(relation.getTarget().getColumn()); 189 } 190 191 String target_db = "default"; 192 if(!SQLUtil.isEmpty(targetTable.getDatabase())){ 193 target_db = targetTable.getDatabase(); 194 } 195 String target_schema = "default"; 196 if(!SQLUtil.isEmpty(targetTable.getSchema())){ 197 target_schema = targetTable.getSchema(); 198 } 199 String target_table = targetTable.getTableNameOnly(); 200 String process_type = process.getType(); 201 String process_hashid = process.getQueryHashId(); 202 String process_coordinate = process.getCoordinate(); 203 String procedure_names = process.getProcedureName(); 204 if("batchQueries".equals(procedure_names)) { 205 procedure_names = ""; 206 } 207 208 StringBuilder temp = new StringBuilder(); 209 temp.append(source_db).append(delimiter).append(source_schema).append(delimiter).append(source_table).append(delimiter) 210 .append(String.join(";", sourceColumns)).append(delimiter).append(target_db).append(delimiter) 211 .append(target_schema).append(delimiter).append(target_table).append(delimiter) 212 .append(String.join(";", targetColumns)).append(delimiter).append(process_type).append(delimiter) 213 .append(process_hashid).append(delimiter).append("\"").append(process_coordinate).append("\"").append(delimiter).append(procedure_names).append("\n"); 214 lineSet.add(temp.toString()); 215 } 216 217 for(String lineKey: lineSet) { 218 buffer.append(lineKey); 219 } 220 221 } catch (Exception e) { 222 logger.error("Generate table level csv failed.", e); 223 } 224 return buffer.toString(); 225 } 226 227 public static dataflow generateTableLevelLineage(DataFlowAnalyzer analyzer, dataflow instance, 228 boolean ignoreSelfLineage, boolean isSimple) { 229 try { 230 dataflow simple = instance; 231 if (!isSimple) { 232 simple = analyzer.getSimpleDataflow(instance, true); 233 } 234 if (simple.getTables() == null) { 235 return simple; 236 } 237 238 List<relationship> relations = instance.getRelationships(); 239 if (relations == null || relations.size() == 0) { 240 return simple; 241 } 242 243 String id = relations.get(relations.size() - 1).getId().split("\\-")[0].replace("_", ""); 244 if(id.trim().length() == 0){ 245 System.out.println(relations.get(relations.size() - 1).getId()); 246 } 247 long maxId = Long.parseLong(id) 248 * 100; 249 250 Iterator<relationship> iter = simple.getRelationships().iterator(); 251 while (iter.hasNext()) { 252 relationship relation = iter.next(); 253 if (!"fdd".equals(relation.getType())) { 254 iter.remove(); 255 } 256 } 257 258 Map<Pair3<String, String, String>, relationship> tableRelationMap = new LinkedHashMap<Pair3<String, String, String>, relationship>(); 259 List<relationship> tableRelations = new ArrayList<relationship>(); 260 iter = simple.getRelationships().iterator(); 261 while (iter.hasNext()) { 262 relationship relation = iter.next(); 263 if (relation.getSources() != null && relation.getTarget() != null && relation.getProcessId() != null) { 264 String targetId = relation.getTarget().getParent_id(); 265 if (SQLUtil.isEmpty(targetId)) 266 continue; 267 for (sourceColumn sourceColumn : relation.getSources()) { 268 String sourceId = sourceColumn.getParent_id(); 269 if (SQLUtil.isEmpty(sourceId)) 270 continue; 271 272 if (ignoreSelfLineage && sourceId.equals(targetId)) { 273 continue; 274 } 275 276 Pair3<String, String, String> tableRelationPair = new Pair3<String, String, String>(sourceId, 277 targetId, relation.getProcessId()); 278 if (!tableRelationMap.containsKey(tableRelationPair)) { 279 relationship tableRelation = new relationship(); 280 281 targetColumn targetTable = new targetColumn(); 282 targetTable.setId(String.valueOf(++maxId)); 283 targetTable.setTarget_id(targetId); 284 targetTable.setTarget_name(relation.getTarget().getParent_name()); 285 tableRelation.setTarget(targetTable); 286 287 sourceColumn sourceTale = new sourceColumn(); 288 sourceTale.setId(String.valueOf(++maxId)); 289 sourceTale.setSource_id(sourceId); 290 sourceTale.setSource_name(sourceColumn.getParent_name()); 291 292 tableRelation.setSources(Arrays.asList(sourceTale)); 293 tableRelation.setType("fdd"); 294 tableRelation.setId(String.valueOf(++maxId)); 295 tableRelation.setProcessId(relation.getProcessId()); 296 tableRelations.add(tableRelation); 297 tableRelationMap.put(tableRelationPair, tableRelation); 298 } 299 } 300 } 301 } 302 303 simple.getRelationships().clear(); 304 simple.getRelationships().addAll(tableRelations); 305 306 List<table> tables = new ArrayList<>(); 307 if (simple.getTables() != null) { 308 tables.addAll(simple.getTables()); 309 } 310 if (simple.getPaths() != null) { 311 tables.addAll(simple.getPaths()); 312 } 313 if (simple.getStages() != null) { 314 tables.addAll(simple.getStages()); 315 } 316 if (simple.getDatasources() != null) { 317 tables.addAll(simple.getDatasources()); 318 } 319 if (simple.getDatabases() != null) { 320 tables.addAll(simple.getDatabases()); 321 } 322 if (simple.getSchemas() != null) { 323 tables.addAll(simple.getSchemas()); 324 } 325 if (simple.getVariables() != null) { 326 tables.addAll(simple.getVariables()); 327 } 328 if (simple.getViews() != null) { 329 tables.addAll(simple.getViews()); 330 } 331 332 for (table table : tables) { 333 table.getColumns().clear(); 334 } 335 336 if (simple.getProcesses() == null) { 337 return simple; 338 } 339 340 Map<String, table> tableMap = new HashMap<String, table>(); 341 Iterator<table> tableIter = tables.iterator(); 342 while (tableIter.hasNext()) { 343 table table = tableIter.next(); 344 tableMap.put(table.getId(), table); 345 } 346 347 Map<String, process> processMap = new HashMap<String, process>(); 348 Iterator<process> processIter = simple.getProcesses().iterator(); 349 while (processIter.hasNext()) { 350 process process = processIter.next(); 351 processMap.put(process.getId(), process); 352 } 353 354 Map<String, relationship> processRelations = new LinkedHashMap<String, relationship>(); 355 iter = simple.getRelationships().iterator(); 356 while (iter.hasNext()) { 357 relationship relation = iter.next(); 358 relationship beforeRelation = new relationship(); 359 beforeRelation.setSources(relation.getSources()); 360 beforeRelation.setType("fdd"); 361 beforeRelation.setId(String.valueOf(++maxId)); 362 363 targetColumn targetProcess = new targetColumn(); 364 targetProcess.setId(String.valueOf(++maxId)); 365 targetProcess.setTarget_id(relation.getProcessId()); 366 targetProcess.setTarget_name(processMap.get(relation.getProcessId()).getName()); 367 beforeRelation.setTarget(targetProcess); 368 369 String key = targetProcess.getTarget_id(); 370 if (beforeRelation.getSources() != null) { 371 for (sourceColumn sourceColumn : beforeRelation.getSources()) { 372 key += (":" + sourceColumn.getSource_id() + ":" + relation.getProcessId()); 373 } 374 } 375 if (!processRelations.containsKey(key)) { 376 beforeRelation.getTarget().setId(beforeRelation.getTarget().getTarget_id()); 377 if (beforeRelation.getSources() != null) { 378 for (sourceColumn sourceColumn : beforeRelation.getSources()) { 379 sourceColumn.setId(sourceColumn.getSource_id()); 380 } 381 } 382 processRelations.put(key, beforeRelation); 383 } 384 385 relationship afterRelation = new relationship(); 386 afterRelation.setTarget(relation.getTarget()); 387 afterRelation.setType("fdd"); 388 afterRelation.setId(String.valueOf(++maxId)); 389 390 sourceColumn sourceProcess = new sourceColumn(); 391 sourceProcess.setId(String.valueOf(++maxId)); 392 sourceProcess.setSource_id(relation.getProcessId()); 393 sourceProcess.setSource_name(processMap.get(relation.getProcessId()).getName()); 394 afterRelation.setSources(Arrays.asList(new sourceColumn[] { sourceProcess })); 395 396 key = relation.getTarget().getTarget_id(); 397 key += (":" + sourceProcess.getSource_id() + ":" + relation.getProcessId()); 398 if (!processRelations.containsKey(key)) { 399 afterRelation.getTarget().setId(afterRelation.getTarget().getTarget_id()); 400 sourceProcess.setId(sourceProcess.getSource_id()); 401 processRelations.put(key, afterRelation); 402 } 403 404 iter.remove(); 405 } 406 407 simple.getRelationships().addAll(processRelations.values()); 408 409 return simple; 410 } catch (Exception e) { 411 logger.error("Generate table level process failed.", e); 412 } 413 return instance; 414 } 415 416 public static void main(String[] args) throws Exception { 417 dataflow dataflow = XML2Model.loadXML(dataflow.class, new File( 418 "C:/Users/KK/Desktop/e091bfbe0f4178ffbcff4711751a119ec06087c9723be51a26c3a1b6b120acf8.dataflow.zip")); 419 DataFlowAnalyzer dataFlowAnalyzer = new DataFlowAnalyzer("", EDbVendor.dbvoracle, true); 420 dataflow dataflow1 = ProcessUtility.generateTableLevelLineage(dataFlowAnalyzer, dataflow); 421 XML2Model.saveXML(dataflow1, new File("D:\\1.zip")); 422 } 423 424 public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance) { 425 return generateColumnLevelLineageCsv(analyzer, instance, false, ","); 426 } 427 428 public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, String delimiter) { 429 return generateColumnLevelLineageCsv(analyzer, instance, false, delimiter); 430 } 431 432 public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, boolean isSimple, String delimiter) { 433 StringBuilder buffer = new StringBuilder(); 434 buffer.append("SOURCE_DB"+delimiter+"SOURCE_SCHEMA"+delimiter+"SOURCE_TABLE_ID"+delimiter+"SOURCE_TABLE"+delimiter+"SOURCE_COLUMN_ID"+delimiter+"SOURCE_COLUMN"+delimiter+"TARGET_DB"+delimiter+"TARGET_SCHEMA"+delimiter+"TARGET_TABLE_ID"+delimiter+"TARGET_TABLE"+delimiter+"TARGET_COLUMN_ID"+delimiter+"TARGET_COLUMN"+delimiter+"RELATION_TYPE"+delimiter+"EFFECTTYPE\n"); 435 try { 436 dataflow simple = instance; 437// if (!isSimple) { 438// simple = analyzer.getSimpleDataflow(instance, true); 439// } 440 List<relationship> relations = simple.getRelationships(); 441 if (relations == null || relations.size() == 0) 442 return buffer.toString(); 443 444 List<table> tables = new ArrayList<>(); 445 if (simple.getTables() != null) { 446 tables.addAll(simple.getTables()); 447 } 448 if (simple.getPaths() != null) { 449 tables.addAll(simple.getPaths()); 450 } 451 if (simple.getStages() != null) { 452 tables.addAll(simple.getStages()); 453 } 454 if (simple.getDatasources() != null) { 455 tables.addAll(simple.getDatasources()); 456 } 457 if (simple.getStreams() != null) { 458 tables.addAll(simple.getStreams()); 459 } 460 if (simple.getDatabases() != null) { 461 tables.addAll(simple.getDatabases()); 462 } 463 if (simple.getSchemas() != null) { 464 tables.addAll(simple.getSchemas()); 465 } 466 if (simple.getVariables() != null) { 467 tables.addAll(simple.getVariables()); 468 } 469 if (simple.getViews() != null) { 470 tables.addAll(simple.getViews()); 471 } 472 if (simple.getResultsets() != null) { 473 tables.addAll(simple.getResultsets()); 474 } 475 476 Map<String, table> tableMap = new HashMap<String, table>(); 477 Iterator<table> tableIter = tables.iterator(); 478 while (tableIter.hasNext()) { 479 table table = tableIter.next(); 480 tableMap.put(table.getId(), table); 481 } 482 483 TreeSet<String> lines = new TreeSet<String>(new Comparator<String>() { 484 @Override 485 public int compare(String o1, String o2) { 486 return o1.toLowerCase().compareTo(o2.toLowerCase()); 487 } 488 }); 489 for(relationship relation: relations) { 490 if(relation.getSources()==null || relation.getTarget() == null) 491 continue; 492 493 String relation_type = relation.getType(); 494 if("fdd".equals(relation_type)) { 495 relation_type = "direct"; 496 } 497 else if("fdr".equals(relation_type) || "frd".equals(relation_type) || "fddi".equals(relation_type)) { 498 relation_type = "indirect"; 499 } 500 501 String effect_type = relation.getEffectType(); 502 503 targetColumn targetColumn = relation.getTarget(); 504 table targetTable = tableMap.get(targetColumn.getParent_id()); 505 String target_db = targetTable.getDatabase(); 506 target_db = target_db == null ? "default" : target_db; 507 String target_schema = targetTable.getSchema(); 508 target_schema = target_schema == null ? "default" : target_schema; 509 String target_table = targetTable.getName(); 510 String target_table_id = targetTable.getId(); 511 String target_column = targetColumn.getColumn(); 512 String target_column_id = targetColumn.getId(); 513 514 for (sourceColumn sourceColumn : relation.getSources()) { 515 StringBuilder temp = new StringBuilder(); 516 table sourceTable = tableMap.get(sourceColumn.getParent_id()); 517 String source_db = sourceTable.getDatabase(); 518 source_db = source_db == null ? "default" : source_db; 519 String source_schema = sourceTable.getSchema(); 520 source_schema = source_schema == null ? "default" : source_schema; 521 String source_table = sourceTable.getName(); 522 String source_table_id = sourceTable.getId(); 523 String source_column = sourceColumn.getColumn(); 524 String source_column_id = sourceColumn.getId(); 525 526 temp.append(source_db).append(delimiter).append(source_schema).append(delimiter).append(source_table_id).append(delimiter).append(source_table) 527 .append(delimiter).append(source_column_id).append(delimiter).append(source_column).append(delimiter).append(target_db) 528 .append(delimiter).append(target_schema).append(delimiter).append(target_table_id).append(delimiter).append(target_table).append(delimiter) 529 .append(target_column_id).append(delimiter).append(target_column).append(delimiter).append(relation_type).append(delimiter) 530 .append(effect_type).append("\n"); 531 lines.add(temp.toString()); 532 } 533 } 534 for(String line: lines) { 535 buffer.append(line); 536 } 537 } catch (Exception e) { 538 logger.error("Generate column level csv failed.", e); 539 } 540 return buffer.toString(); 541 } 542}