001package gudusoft.gsqlparser.dlineage.util;
002
003import java.io.File;
004import java.util.ArrayList;
005import java.util.Arrays;
006import java.util.Comparator;
007import java.util.HashMap;
008import java.util.Iterator;
009import java.util.LinkedHashMap;
010import java.util.LinkedHashSet;
011import java.util.List;
012import java.util.Map;
013import java.util.Set;
014import java.util.TreeSet;
015
016import gudusoft.gsqlparser.EDbVendor;
017import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer;
018import gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow;
019import gudusoft.gsqlparser.dlineage.dataflow.model.xml.process;
020import gudusoft.gsqlparser.dlineage.dataflow.model.xml.relationship;
021import gudusoft.gsqlparser.dlineage.dataflow.model.xml.sourceColumn;
022import gudusoft.gsqlparser.dlineage.dataflow.model.xml.table;
023import gudusoft.gsqlparser.dlineage.dataflow.model.xml.targetColumn;
024import gudusoft.gsqlparser.util.Logger;
025import gudusoft.gsqlparser.util.LoggerFactory;
026import gudusoft.gsqlparser.util.SQLUtil;
027
028public class ProcessUtility {
029
030        private static final Logger logger = LoggerFactory.getLogger(ProcessUtility.class);
031
032        public static dataflow generateTableLevelLineage(DataFlowAnalyzer analyzer, dataflow instance) {
033                return generateTableLevelLineage(analyzer, instance, true, false);
034        }
035
036        public static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance) {
037                return generateTableLevelLineageCsv(analyzer, instance, true, false, ",");
038        }
039        
040        public static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, String delimiter) {
041                return generateTableLevelLineageCsv(analyzer, instance, true, false, delimiter);
042        }
043        
044        private static String generateTableLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, boolean ignoreSelfLineage, boolean isSimple, String delimiter) {
045                StringBuilder buffer = new StringBuilder();
046                buffer.append("source_db"+delimiter+"source_schema"+delimiter+"source_table"+delimiter+"source_column"+delimiter+"target_db"+delimiter+"target_schema"+delimiter+"target_table"+delimiter+"target_column"+delimiter+"process_type"+delimiter+"process_hashid"+delimiter+"coordinate"+delimiter+"procedure_names\n");
047                try {
048                        dataflow simple = instance;
049                        if (!isSimple) {
050                                simple = analyzer.getSimpleDataflow(instance, true);
051                        }
052                        if (simple.getTables() == null) {
053                                return null;
054                        }
055                        List<relationship> relations = instance.getRelationships();
056                        if (relations == null || relations.size() == 0) {
057                                return null;
058                        }
059                        long maxId = Long.parseLong(relations.get(relations.size() - 1).getId().split("\\-")[0].replace("_", ""))
060                                        * 100;
061
062                        Iterator<relationship> iter = simple.getRelationships().iterator();
063                        while (iter.hasNext()) {
064                                relationship relation = iter.next();
065                                if (!"fdd".equals(relation.getType())) {
066                                        iter.remove();
067                                }
068                        }
069                        
070                        Map<Pair3<String, String, String>, Set<relationship>> tableRelationMap = new LinkedHashMap<Pair3<String, String, String>, Set<relationship>>();
071                        List<relationship> tableRelations = new ArrayList<relationship>();
072                        iter = simple.getRelationships().iterator();
073                        while (iter.hasNext()) {
074                                relationship relation = iter.next();
075                                if (relation.getSources() != null && relation.getTarget() != null && relation.getProcessId() != null) {
076                                        String targetId = relation.getTarget().getParent_id();
077                                        if (SQLUtil.isEmpty(targetId))
078                                                continue;
079                                        for (sourceColumn sourceColumn : relation.getSources()) {
080                                                String sourceId = sourceColumn.getParent_id();
081                                                if (SQLUtil.isEmpty(sourceId))
082                                                        continue;
083
084                                                if (ignoreSelfLineage && sourceId.equals(targetId)) {
085                                                        continue;
086                                                }
087
088                                                Pair3<String, String, String> tableRelationPair = new Pair3<String, String, String>(sourceId,
089                                                                targetId, relation.getProcessId());
090                                                if (!tableRelationMap.containsKey(tableRelationPair)) {
091                                                        relationship tableRelation = new relationship();
092
093                                                        targetColumn targetTable = new targetColumn();
094                                                        targetTable.setId(String.valueOf(++maxId));
095                                                        targetTable.setTarget_id(targetId);
096                                                        targetTable.setTarget_name(relation.getTarget().getParent_name());
097                                                        tableRelation.setTarget(targetTable);
098
099                                                        sourceColumn sourceTale = new sourceColumn();
100                                                        sourceTale.setId(String.valueOf(++maxId));
101                                                        sourceTale.setSource_id(sourceId);
102                                                        sourceTale.setSource_name(sourceColumn.getParent_name());
103
104                                                        tableRelation.setSources(Arrays.asList(sourceTale));
105                                                        tableRelation.setType("fdd");
106                                                        tableRelation.setId(String.valueOf(++maxId));
107                                                        tableRelation.setProcessId(relation.getProcessId());
108                                                        tableRelations.add(tableRelation);
109                                                        tableRelationMap.put(tableRelationPair, new LinkedHashSet<relationship>());
110                                                }
111                                                tableRelationMap.get(tableRelationPair).add(relation);
112                                        }
113                                }
114                        }
115
116                        List<table> tables = new ArrayList<>();
117                        if (simple.getTables() != null) {
118                                tables.addAll(simple.getTables());
119                        }
120                        if (simple.getPaths() != null) {
121                                tables.addAll(simple.getPaths());
122                        }
123                        if (simple.getStages() != null) {
124                                tables.addAll(simple.getStages());
125                        }
126                        if (simple.getDatasources() != null) {
127                                tables.addAll(simple.getDatasources());
128                        }
129                        if (simple.getStreams() != null) {
130                                tables.addAll(simple.getStreams());
131                        }
132                        if (simple.getDatabases() != null) {
133                                tables.addAll(simple.getDatabases());
134                        }
135                        if (simple.getSchemas() != null) {
136                                tables.addAll(simple.getSchemas());
137                        }
138                        if (simple.getVariables() != null) {
139                                tables.addAll(simple.getVariables());
140                        }
141                        if (simple.getViews() != null) {
142                                tables.addAll(simple.getViews());
143                        }
144                        Map<String, table> tableMap = new HashMap<String, table>();
145                        Iterator<table> tableIter = tables.iterator();
146                        while (tableIter.hasNext()) {
147                                table table = tableIter.next();
148                                tableMap.put(table.getId(), table);
149                        }
150
151                        Map<String, process> processMap = new HashMap<String, process>();
152                        Iterator<process> processIter = simple.getProcesses().iterator();
153                        while (processIter.hasNext()) {
154                                process process = processIter.next();
155                                processMap.put(process.getId(), process);
156                        }
157                        
158                        Set<String> lineSet = new TreeSet<String>(new Comparator<String>() {
159                                @Override
160                                public int compare(String o1, String o2) {
161                                        return o1.toLowerCase().compareTo(o2.toLowerCase());
162                                }
163                        });
164                        for (Pair3<String, String, String> key : tableRelationMap.keySet()) {
165                                String sourceId = key.first;
166                                String targetId = key.second;
167                                String processId = key.third;
168                                table sourceTable = tableMap.get(sourceId);
169                                table targetTable = tableMap.get(targetId);
170                                process process = processMap.get(processId);
171                                
172                                String source_db = "default";
173                                if(!SQLUtil.isEmpty(sourceTable.getDatabase())){
174                                        source_db = sourceTable.getDatabase();
175                                }
176                                String source_schema = "default";
177                                if(!SQLUtil.isEmpty(sourceTable.getSchema())){
178                                        source_schema = sourceTable.getSchema();
179                                }
180                                String source_table = sourceTable.getTableNameOnly();
181                                Set<relationship> processRelations = tableRelationMap.get(key);
182                                Set<String> sourceColumns = new LinkedHashSet<String>();
183                                Set<String> targetColumns = new LinkedHashSet<String>();
184                                for (relationship relation : processRelations) {
185                                        for (sourceColumn column : relation.getSources()) {
186                                                sourceColumns.add(column.getColumn());
187                                        }
188                                        targetColumns.add(relation.getTarget().getColumn());
189                                }
190                                
191                                String target_db = "default";
192                                if(!SQLUtil.isEmpty(targetTable.getDatabase())){
193                                        target_db = targetTable.getDatabase();
194                                }
195                                String target_schema = "default";
196                                if(!SQLUtil.isEmpty(targetTable.getSchema())){
197                                        target_schema = targetTable.getSchema();
198                                }
199                                String target_table = targetTable.getTableNameOnly();
200                                String process_type = process.getType();
201                                String process_hashid = process.getQueryHashId();
202                                String process_coordinate = process.getCoordinate();
203                                String procedure_names = process.getProcedureName();
204                                if("batchQueries".equals(procedure_names)) {
205                                        procedure_names = "";
206                                }
207                                
208                                StringBuilder temp = new StringBuilder();
209                                temp.append(source_db).append(delimiter).append(source_schema).append(delimiter).append(source_table).append(delimiter)
210                                                .append(String.join(";", sourceColumns)).append(delimiter).append(target_db).append(delimiter)
211                                                .append(target_schema).append(delimiter).append(target_table).append(delimiter)
212                                                .append(String.join(";", targetColumns)).append(delimiter).append(process_type).append(delimiter)
213                                                .append(process_hashid).append(delimiter).append("\"").append(process_coordinate).append("\"").append(delimiter).append(procedure_names).append("\n");
214                                lineSet.add(temp.toString());
215                        }
216                        
217                        for(String lineKey: lineSet) {
218                                buffer.append(lineKey);
219                        }
220                        
221                } catch (Exception e) {
222                        logger.error("Generate table level csv failed.", e);
223                }
224                return buffer.toString();
225        }
226
227        public static dataflow generateTableLevelLineage(DataFlowAnalyzer analyzer, dataflow instance,
228                        boolean ignoreSelfLineage, boolean isSimple) {
229                try {
230                        dataflow simple = instance;
231                        if (!isSimple) {
232                                simple = analyzer.getSimpleDataflow(instance, true);
233                        }
234                        if (simple.getTables() == null) {
235                                return simple;
236                        }
237
238                        List<relationship> relations = instance.getRelationships();
239                        if (relations == null || relations.size() == 0) {
240                                return simple;
241                        }
242
243                        String id = relations.get(relations.size() - 1).getId().split("\\-")[0].replace("_", "");
244                        if(id.trim().length() == 0){
245                                System.out.println(relations.get(relations.size() - 1).getId());
246                        }
247                        long maxId = Long.parseLong(id)
248                                        * 100;
249
250                        Iterator<relationship> iter = simple.getRelationships().iterator();
251                        while (iter.hasNext()) {
252                                relationship relation = iter.next();
253                                if (!"fdd".equals(relation.getType())) {
254                                        iter.remove();
255                                }
256                        }
257
258                        Map<Pair3<String, String, String>, relationship> tableRelationMap = new LinkedHashMap<Pair3<String, String, String>, relationship>();
259                        List<relationship> tableRelations = new ArrayList<relationship>();
260                        iter = simple.getRelationships().iterator();
261                        while (iter.hasNext()) {
262                                relationship relation = iter.next();
263                                if (relation.getSources() != null && relation.getTarget() != null && relation.getProcessId() != null) {
264                                        String targetId = relation.getTarget().getParent_id();
265                                        if (SQLUtil.isEmpty(targetId))
266                                                continue;
267                                        for (sourceColumn sourceColumn : relation.getSources()) {
268                                                String sourceId = sourceColumn.getParent_id();
269                                                if (SQLUtil.isEmpty(sourceId))
270                                                        continue;
271
272                                                if (ignoreSelfLineage && sourceId.equals(targetId)) {
273                                                        continue;
274                                                }
275
276                                                Pair3<String, String, String> tableRelationPair = new Pair3<String, String, String>(sourceId,
277                                                                targetId, relation.getProcessId());
278                                                if (!tableRelationMap.containsKey(tableRelationPair)) {
279                                                        relationship tableRelation = new relationship();
280
281                                                        targetColumn targetTable = new targetColumn();
282                                                        targetTable.setId(String.valueOf(++maxId));
283                                                        targetTable.setTarget_id(targetId);
284                                                        targetTable.setTarget_name(relation.getTarget().getParent_name());
285                                                        tableRelation.setTarget(targetTable);
286
287                                                        sourceColumn sourceTale = new sourceColumn();
288                                                        sourceTale.setId(String.valueOf(++maxId));
289                                                        sourceTale.setSource_id(sourceId);
290                                                        sourceTale.setSource_name(sourceColumn.getParent_name());
291
292                                                        tableRelation.setSources(Arrays.asList(sourceTale));
293                                                        tableRelation.setType("fdd");
294                                                        tableRelation.setId(String.valueOf(++maxId));
295                                                        tableRelation.setProcessId(relation.getProcessId());
296                                                        tableRelations.add(tableRelation);
297                                                        tableRelationMap.put(tableRelationPair, tableRelation);
298                                                }
299                                        }
300                                }
301                        }
302
303                        simple.getRelationships().clear();
304                        simple.getRelationships().addAll(tableRelations);
305
306                        List<table> tables = new ArrayList<>();
307                        if (simple.getTables() != null) {
308                                tables.addAll(simple.getTables());
309                        }
310                        if (simple.getPaths() != null) {
311                                tables.addAll(simple.getPaths());
312                        }
313                        if (simple.getStages() != null) {
314                                tables.addAll(simple.getStages());
315                        }
316                        if (simple.getDatasources() != null) {
317                                tables.addAll(simple.getDatasources());
318                        }
319                        if (simple.getDatabases() != null) {
320                                tables.addAll(simple.getDatabases());
321                        }
322                        if (simple.getSchemas() != null) {
323                                tables.addAll(simple.getSchemas());
324                        }
325                        if (simple.getVariables() != null) {
326                                tables.addAll(simple.getVariables());
327                        }
328                        if (simple.getViews() != null) {
329                                tables.addAll(simple.getViews());
330                        }
331
332                        for (table table : tables) {
333                                table.getColumns().clear();
334                        }
335
336                        if (simple.getProcesses() == null) {
337                                return simple;
338                        }
339
340                        Map<String, table> tableMap = new HashMap<String, table>();
341                        Iterator<table> tableIter = tables.iterator();
342                        while (tableIter.hasNext()) {
343                                table table = tableIter.next();
344                                tableMap.put(table.getId(), table);
345                        }
346
347                        Map<String, process> processMap = new HashMap<String, process>();
348                        Iterator<process> processIter = simple.getProcesses().iterator();
349                        while (processIter.hasNext()) {
350                                process process = processIter.next();
351                                processMap.put(process.getId(), process);
352                        }
353
354                        Map<String, relationship> processRelations = new LinkedHashMap<String, relationship>();
355                        iter = simple.getRelationships().iterator();
356                        while (iter.hasNext()) {
357                                relationship relation = iter.next();
358                                relationship beforeRelation = new relationship();
359                                beforeRelation.setSources(relation.getSources());
360                                beforeRelation.setType("fdd");
361                                beforeRelation.setId(String.valueOf(++maxId));
362
363                                targetColumn targetProcess = new targetColumn();
364                                targetProcess.setId(String.valueOf(++maxId));
365                                targetProcess.setTarget_id(relation.getProcessId());
366                                targetProcess.setTarget_name(processMap.get(relation.getProcessId()).getName());
367                                beforeRelation.setTarget(targetProcess);
368
369                                String key = targetProcess.getTarget_id();
370                                if (beforeRelation.getSources() != null) {
371                                        for (sourceColumn sourceColumn : beforeRelation.getSources()) {
372                                                key += (":" + sourceColumn.getSource_id() + ":" + relation.getProcessId());
373                                        }
374                                }
375                                if (!processRelations.containsKey(key)) {
376                                        beforeRelation.getTarget().setId(beforeRelation.getTarget().getTarget_id());
377                                        if (beforeRelation.getSources() != null) {
378                                                for (sourceColumn sourceColumn : beforeRelation.getSources()) {
379                                                        sourceColumn.setId(sourceColumn.getSource_id());
380                                                }
381                                        }
382                                        processRelations.put(key, beforeRelation);
383                                }
384
385                                relationship afterRelation = new relationship();
386                                afterRelation.setTarget(relation.getTarget());
387                                afterRelation.setType("fdd");
388                                afterRelation.setId(String.valueOf(++maxId));
389
390                                sourceColumn sourceProcess = new sourceColumn();
391                                sourceProcess.setId(String.valueOf(++maxId));
392                                sourceProcess.setSource_id(relation.getProcessId());
393                                sourceProcess.setSource_name(processMap.get(relation.getProcessId()).getName());
394                                afterRelation.setSources(Arrays.asList(new sourceColumn[] { sourceProcess }));
395
396                                key = relation.getTarget().getTarget_id();
397                                key += (":" + sourceProcess.getSource_id() + ":" + relation.getProcessId());
398                                if (!processRelations.containsKey(key)) {
399                                        afterRelation.getTarget().setId(afterRelation.getTarget().getTarget_id());
400                                        sourceProcess.setId(sourceProcess.getSource_id());
401                                        processRelations.put(key, afterRelation);
402                                }
403
404                                iter.remove();
405                        }
406
407                        simple.getRelationships().addAll(processRelations.values());
408
409                        return simple;
410                } catch (Exception e) {
411                        logger.error("Generate table level process failed.", e);
412                }
413                return instance;
414        }
415
416        public static void main(String[] args) throws Exception {
417                dataflow dataflow = XML2Model.loadXML(dataflow.class, new File(
418                                "C:/Users/KK/Desktop/e091bfbe0f4178ffbcff4711751a119ec06087c9723be51a26c3a1b6b120acf8.dataflow.zip"));
419                DataFlowAnalyzer dataFlowAnalyzer = new DataFlowAnalyzer("", EDbVendor.dbvoracle, true);
420                dataflow dataflow1 = ProcessUtility.generateTableLevelLineage(dataFlowAnalyzer, dataflow);
421                XML2Model.saveXML(dataflow1, new File("D:\\1.zip"));
422        }
423        
424        public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance) {
425                return generateColumnLevelLineageCsv(analyzer, instance, false, ",");
426        }
427        
428        public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, String delimiter) {
429                return generateColumnLevelLineageCsv(analyzer, instance, false, delimiter);
430        }
431        
432        public static String generateColumnLevelLineageCsv(DataFlowAnalyzer analyzer, dataflow instance, boolean isSimple, String delimiter) {
433                StringBuilder buffer = new StringBuilder();
434                buffer.append("SOURCE_DB"+delimiter+"SOURCE_SCHEMA"+delimiter+"SOURCE_TABLE_ID"+delimiter+"SOURCE_TABLE"+delimiter+"SOURCE_COLUMN_ID"+delimiter+"SOURCE_COLUMN"+delimiter+"TARGET_DB"+delimiter+"TARGET_SCHEMA"+delimiter+"TARGET_TABLE_ID"+delimiter+"TARGET_TABLE"+delimiter+"TARGET_COLUMN_ID"+delimiter+"TARGET_COLUMN"+delimiter+"RELATION_TYPE"+delimiter+"EFFECTTYPE\n");
435                try {
436                        dataflow simple = instance;
437//                      if (!isSimple) {
438//                              simple = analyzer.getSimpleDataflow(instance, true);
439//                      }
440                        List<relationship> relations = simple.getRelationships();
441                        if (relations == null || relations.size() == 0) 
442                                return buffer.toString();
443                        
444                        List<table> tables = new ArrayList<>();
445                        if (simple.getTables() != null) {
446                                tables.addAll(simple.getTables());
447                        }
448                        if (simple.getPaths() != null) {
449                                tables.addAll(simple.getPaths());
450                        }
451                        if (simple.getStages() != null) {
452                                tables.addAll(simple.getStages());
453                        }
454                        if (simple.getDatasources() != null) {
455                                tables.addAll(simple.getDatasources());
456                        }
457                        if (simple.getStreams() != null) {
458                                tables.addAll(simple.getStreams());
459                        }
460                        if (simple.getDatabases() != null) {
461                                tables.addAll(simple.getDatabases());
462                        }
463                        if (simple.getSchemas() != null) {
464                                tables.addAll(simple.getSchemas());
465                        }
466                        if (simple.getVariables() != null) {
467                                tables.addAll(simple.getVariables());
468                        }
469                        if (simple.getViews() != null) {
470                                tables.addAll(simple.getViews());
471                        }
472                        if (simple.getResultsets() != null) {
473                                tables.addAll(simple.getResultsets());
474                        }
475
476                        Map<String, table> tableMap = new HashMap<String, table>();
477                        Iterator<table> tableIter = tables.iterator();
478                        while (tableIter.hasNext()) {
479                                table table = tableIter.next();
480                                tableMap.put(table.getId(), table);
481                        }
482                        
483                        TreeSet<String> lines = new TreeSet<String>(new Comparator<String>() {
484                                @Override
485                                public int compare(String o1, String o2) {
486                                        return o1.toLowerCase().compareTo(o2.toLowerCase());
487                                }
488                        });
489                        for(relationship relation: relations) {
490                                if(relation.getSources()==null || relation.getTarget() == null)
491                                        continue;
492                                
493                                String relation_type = relation.getType();
494                                if("fdd".equals(relation_type)) {
495                                        relation_type = "direct";
496                                }
497                                else if("fdr".equals(relation_type) || "frd".equals(relation_type) || "fddi".equals(relation_type)) {
498                                        relation_type = "indirect";
499                                }
500                                
501                                String effect_type = relation.getEffectType();
502                                
503                                targetColumn targetColumn = relation.getTarget();
504                                table targetTable = tableMap.get(targetColumn.getParent_id());
505                                String target_db = targetTable.getDatabase();
506                                target_db = target_db == null ? "default" : target_db;
507                                String target_schema = targetTable.getSchema();
508                                target_schema = target_schema == null ? "default" : target_schema;
509                                String target_table = targetTable.getName();
510                                String target_table_id = targetTable.getId();
511                                String target_column = targetColumn.getColumn();
512                                String target_column_id = targetColumn.getId();
513                                
514                                for (sourceColumn sourceColumn : relation.getSources()) {
515                                        StringBuilder temp = new StringBuilder();
516                                        table sourceTable = tableMap.get(sourceColumn.getParent_id());
517                                        String source_db = sourceTable.getDatabase();
518                                        source_db = source_db == null ? "default" : source_db;
519                                        String source_schema = sourceTable.getSchema();
520                                        source_schema = source_schema == null ? "default" : source_schema;
521                                        String source_table = sourceTable.getName();
522                                        String source_table_id = sourceTable.getId();
523                                        String source_column = sourceColumn.getColumn();
524                                        String source_column_id = sourceColumn.getId();
525                                
526                                        temp.append(source_db).append(delimiter).append(source_schema).append(delimiter).append(source_table_id).append(delimiter).append(source_table)
527                                                        .append(delimiter).append(source_column_id).append(delimiter).append(source_column).append(delimiter).append(target_db)
528                                                        .append(delimiter).append(target_schema).append(delimiter).append(target_table_id).append(delimiter).append(target_table).append(delimiter)
529                                                        .append(target_column_id).append(delimiter).append(target_column).append(delimiter).append(relation_type).append(delimiter)
530                                                        .append(effect_type).append("\n");
531                                        lines.add(temp.toString());
532                                }
533                        }
534                        for(String line: lines) {
535                                buffer.append(line);
536                        }
537                } catch (Exception e) {
538                        logger.error("Generate column level csv failed.", e);
539                }
540                return buffer.toString();
541        }
542}