001package gudusoft.gsqlparser.dlineage;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TGSqlParser;
006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener;
007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader;
008import gudusoft.gsqlparser.dlineage.dataflow.model.*;
009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate;
010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*;
011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser;
012import gudusoft.gsqlparser.dlineage.util.*;
013import gudusoft.gsqlparser.sqlenv.TSQLEnv;
014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
015import gudusoft.gsqlparser.util.IndexedLinkedHashMap;
016import gudusoft.gsqlparser.util.Logger;
017import gudusoft.gsqlparser.util.LoggerFactory;
018import gudusoft.gsqlparser.util.SQLUtil;
019import gudusoft.gsqlparser.util.json.JSON;
020
021import java.io.File;
022import java.util.*;
023import java.util.concurrent.*;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.stream.Collectors;
026
027public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer {
028    private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class);
029    private SqlInfo[] sqlInfos;
030    private Option option = new Option();
031    private TSQLEnv sqlenv = null;
032    private dataflow dataflow;
033    private String dataflowString;
034    private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>();
035    private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>();
036    private final Map<String, String> hashSQLMap = new HashMap();
037
038    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) {
039        this.sqlInfos = sqlInfos;
040        option.setVendor(dbVendor);
041        option.setSimpleOutput(simpleOutput);
042        ModelBindingManager.setGlobalOption(option);
043    }
044
045    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) {
046        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
047        for (int i = 0; i < sqlContents.length; i++) {
048            SqlInfo info = new SqlInfo();
049            info.setSql(sqlContents[i]);
050            info.setOriginIndex(0);
051            sqlInfos[i] = info;
052        }
053        option.setVendor(dbVendor);
054        option.setSimpleOutput(simpleOutput);
055                option.setDefaultServer(defaultServer);
056                option.setDefaultDatabase(defaultDatabase);
057                option.setDefaultSchema(defaltSchema);
058        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
059    }
060    
061    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) {
062        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
063        for (int i = 0; i < sqlContents.length; i++) {
064            SqlInfo info = new SqlInfo();
065            info.setSql(sqlContents[i]);
066            info.setOriginIndex(0);
067            sqlInfos[i] = info;
068        }
069        option.setVendor(dbVendor);
070        option.setSimpleOutput(simpleOutput);
071        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
072    }
073
074    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) {
075        this.sqlInfos = sqlInfos;
076        this.option = option;
077        ModelBindingManager.setGlobalOption(option);
078    }
079
080    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) {
081        SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length];
082        for (int i = 0; i < sqlFiles.length; i++) {
083            SqlInfo info = new SqlInfo();
084            info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
085            info.setFileName(sqlFiles[i].getName());
086            info.setFilePath(sqlFiles[i].getAbsolutePath());
087            info.setOriginIndex(0);
088            sqlInfos[i] = info;
089        }
090        this.sqlInfos = sqlInfos;
091        this.option = option;
092        ModelBindingManager.setGlobalOption(option);
093    }
094    
095    protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) {
096                List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>();
097                List sqlContents = (List) JSON.parseObject(json);
098                for (int j = 0; j < sqlContents.size(); j++) {
099                        Map sqlContent = (Map) sqlContents.get(j);
100                        String sql = (String) sqlContent.get("sql");
101                        String fileName = (String) sqlContent.get("fileName");
102                        String filePath = (String) sqlContent.get("filePath");
103                        if (sql != null && sql.trim().startsWith("{")) {
104                                if (sql.indexOf("createdBy") != -1) {
105                                        if (this.sqlenv == null) {
106                                                TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql);
107                                                if (sqlenvs != null) {
108                                                        this.sqlenv = sqlenvs[0];
109                                                }
110                                        }
111                        if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) {
112                                Map queryObject = (Map) JSON.parseObject(sql);
113                                        List querys = (List) queryObject.get("queries");
114                                        if (querys != null) {
115                                                for (int i = 0; i < querys.size(); i++) {
116                                                        Map object = (Map) querys.get(i);
117                                                        SqlInfo info = new SqlInfo();
118                                                        info.setSql(JSON.toJSONString(object));
119                                                        info.setFileName(fileName);
120                                                        info.setFilePath(filePath);
121                                                        info.setOriginIndex(i);
122                                                        sqlInfos.add(info);
123                                                }
124                                                queryObject.remove("queries");
125                                                SqlInfo info = new SqlInfo();
126                                                info.setSql(JSON.toJSONString(queryObject));
127                                                info.setFileName(fileName);
128                                                info.setFilePath(filePath);
129                                                info.setOriginIndex(querys.size());
130                                                sqlInfos.add(info);
131                                        } else {
132                                                SqlInfo info = new SqlInfo();
133                                                info.setSql(JSON.toJSONString(queryObject));
134                                                info.setFileName(fileName);
135                                                info.setFilePath(filePath);
136                                                info.setOriginIndex(0);
137                                                sqlInfos.add(info);
138                                        }
139                        }
140                        else if (sql.toLowerCase().indexOf("sqlflow") != -1) {
141                                                Map sqlflow = (Map) JSON.parseObject(sql);
142                                                List<Map> servers = (List<Map>) sqlflow.get("servers");
143                                                if (servers != null) {
144                                                        for (Map queryObject : servers) {
145                                                                String name = (String) queryObject.get("name");
146                                                                String dbVendor = (String) queryObject.get("dbVendor");
147                                                                List querys = (List) queryObject.get("queries");
148                                                                if (querys != null) {
149                                                                        for (int i = 0; i < querys.size(); i++) {
150                                                                                Map object = (Map) querys.get(i);
151                                                                                SqlInfo info = new SqlInfo();
152                                                                                info.setSql(JSON.toJSONString(object));
153                                                                                info.setFileName(fileName);
154                                                                                info.setFilePath(filePath);
155                                                                                info.setOriginIndex(i);
156                                                                                info.setDbVendor(dbVendor);
157                                                                                info.setServer(name);
158                                                                                sqlInfos.add(info);
159                                                                        }
160                                                                        queryObject.remove("queries");
161                                                                        SqlInfo info = new SqlInfo();
162                                                                        info.setSql(JSON.toJSONString(queryObject));
163                                                                        info.setFileName(fileName);
164                                                                        info.setFilePath(filePath);
165                                                                        info.setOriginIndex(querys.size());
166                                                                        info.setDbVendor(dbVendor);
167                                                                        info.setServer(filePath);
168                                                                        sqlInfos.add(info);
169                                                                } else {
170                                                                        SqlInfo info = new SqlInfo();
171                                                                        info.setSql(JSON.toJSONString(queryObject));
172                                                                        info.setFileName(fileName);
173                                                                        info.setFilePath(filePath);
174                                                                        info.setOriginIndex(0);
175                                                                        sqlInfos.add(info);
176                                                                }
177                                                        }
178                                                }
179                                        }
180                    }
181                        } else if (sql != null) {
182                                SqlInfo info = new SqlInfo();
183                                info.setSql(sql);
184                                info.setFileName(fileName);
185                                info.setFilePath(filePath);
186                                info.setOriginIndex(0);
187                                sqlInfos.add(info);
188                        }
189                }
190                return sqlInfos;
191        }
192
193    @Override
194    public boolean isIgnoreRecordSet() {
195        return option.isIgnoreRecordSet();
196    }
197
198    @Override
199    public void setIgnoreRecordSet(boolean ignoreRecordSet) {
200        option.setIgnoreRecordSet(ignoreRecordSet);
201    }
202
203    @Override
204    public boolean isSimpleShowTopSelectResultSet() {
205        return option.isSimpleShowTopSelectResultSet();
206    }
207
208    @Override
209    public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) {
210        option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet);
211    }
212
213    @Override
214    public boolean isSimpleShowFunction() {
215        return option.isSimpleShowFunction();
216    }
217
218    @Override
219    public void setSimpleShowFunction(boolean simpleShowFunction) {
220        option.setSimpleShowFunction(simpleShowFunction);
221    }
222
223    @Override
224    public boolean isShowJoin() {
225        return option.isShowJoin();
226    }
227
228    @Override
229    public void setShowJoin(boolean showJoin) {
230        option.setShowJoin(showJoin);
231    }
232
233    @Override
234    public boolean isShowImplicitSchema() {
235        return option.isShowImplicitSchema();
236    }
237
238    @Override
239    public void setShowImplicitSchema(boolean showImplicitSchema) {
240        option.setShowImplicitSchema(showImplicitSchema);
241    }
242
243    @Override
244    public boolean isShowConstantTable() {
245        return option.isShowConstantTable();
246    }
247
248    @Override
249    public void setShowConstantTable(boolean showConstantTable) {
250        option.setShowConstantTable(showConstantTable);
251    }
252
253    @Override
254    public boolean isShowCountTableColumn() {
255        return option.isShowCountTableColumn();
256    }
257
258    @Override
259    public void setShowCountTableColumn(boolean showCountTableColumn) {
260        option.setShowCountTableColumn(showCountTableColumn);
261    }
262
263    @Override
264    public boolean isTransform() {
265        return option.isTransform();
266    }
267
268    @Override
269    public void setTransform(boolean transform) {
270        option.setTransform(transform);
271        if (option.isTransformCoordinate()) {
272            option.setTransform(true);
273        }
274    }
275
276    @Override
277    public boolean isTransformCoordinate() {
278        return option.isTransformCoordinate();
279    }
280
281    @Override
282    public void setTransformCoordinate(boolean transformCoordinate) {
283        option.setTransformCoordinate(transformCoordinate);
284        if (transformCoordinate) {
285            option.setTransform(true);
286        }
287    }
288
289    @Override
290    public boolean isLinkOrphanColumnToFirstTable() {
291        return option.isLinkOrphanColumnToFirstTable();
292    }
293
294    @Override
295    public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) {
296        option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable);
297    }
298
299    @Override
300    public boolean isIgnoreCoordinate() {
301        return option.isIgnoreCoordinate();
302    }
303
304    @Override
305    public void setIgnoreCoordinate(boolean ignoreCoordinate) {
306        option.setIgnoreCoordinate(ignoreCoordinate);
307    }
308
309    @Override
310    public void setHandleListener(DataFlowHandleListener listener) {
311        option.setHandleListener(listener);
312    }
313
314    @Override
315    public void setSqlEnv(TSQLEnv sqlenv) {
316        this.sqlenv = sqlenv;
317    }
318
319    @Override
320    public void setOption(Option option) {
321        this.option = option;
322    }
323
324    @Override
325    public Option getOption() {
326        return option;
327    }
328
329    @Override
330    public List<ErrorInfo> getErrorMessages() {
331        return errorInfos;
332    }
333
334    @Override
335    public synchronized String generateSqlInfos() {
336        return JSON.toJSONString(sqlInfoMap);
337    }
338
339    @Override
340    public synchronized String generateDataFlow() {
341        return generateDataFlow(false);
342    }
343
344    @Override
345    public Map<String, List<SqlInfo>> getSqlInfos() {
346        return sqlInfoMap;
347    }
348
349    @Override
350    /**
351     * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo
352     */
353    public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) {
354        if (start == null || end == null) {
355            throw new IllegalArgumentException("Coordinate can't be null.");
356        }
357
358        String hashCode = start.getHashCode();
359
360        if (hashCode == null) {
361            throw new IllegalArgumentException("Coordinate hashcode can't be null.");
362        }
363
364        int dbObjectStartLine = (int) start.getX() - 1;
365        int dbObjectStarColumn = (int) start.getY() - 1;
366        int dbObjectEndLine = (int) end.getX() - 1;
367        int dbObjectEndColumn = (int) end.getY() - 1;
368        List<SqlInfo> sqlInfoList;
369        if (hashCode.matches("\\d+")) {
370            sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode));
371        } else {
372            sqlInfoList = sqlInfoMap.get(hashCode);
373        }
374        for (int j = 0; j < sqlInfoList.size(); j++) {
375            SqlInfo sqlInfo = sqlInfoList.get(j);
376            int startLine = sqlInfo.getLineStart();
377            int endLine = sqlInfo.getLineEnd();
378            if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) {
379                DbObjectPosition position = new DbObjectPosition();
380                position.setFile(sqlInfo.getFileName());
381                position.setFilePath(sqlInfo.getFilePath());
382                position.setSql(sqlInfo.getSql());
383                position.setIndex(sqlInfo.getOriginIndex());
384                List<Pair<Integer, Integer>> positions = position.getPositions();
385                positions.add(new Pair<Integer, Integer>(
386                        dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1));
387                positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1,
388                        dbObjectEndColumn + 1));
389                return position;
390            }
391        }
392        return null;
393    }
394
395    @Override
396    public synchronized String generateDataFlow(final boolean withExtraInfo) {
397        sqlInfoMap.clear();
398        errorInfos.clear();
399        Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>();
400        for (int i = 0; i < sqlInfos.length; i++) {
401            SqlInfo sqlInfo = sqlInfos[i];
402            if (sqlInfo == null || sqlInfo.getSql() == null) {
403                sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>());
404                continue;
405            }
406            String sql = sqlInfo.getSql();
407            if (sql != null && sql.trim().startsWith("{")) {
408                if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) {
409                    String hash = SHA256.getMd5(sql);
410                    String fileHash = SHA256.getMd5(hash);
411                    if (!sqlInfoMap.containsKey(fileHash)) {
412                        sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
413                        sqlInfoMap.get(fileHash).add(sqlInfo);
414                    }
415                } else {
416                    Map queryObject = (Map) JSON.parseObject(sql);
417                    appendSqlInfo(databaseMap, i, sqlInfo, queryObject);
418                }
419            } else {
420                String content = sql;
421                String hash = SHA256.getMd5(content);
422                String fileHash = SHA256.getMd5(hash);
423                if (!sqlInfoMap.containsKey(fileHash)) {
424                    sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
425
426                    String database = TSQLEnv.DEFAULT_DB_NAME;
427                    String schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
428                    if(sqlenv!=null) {
429                        database = sqlenv.getDefaultCatalogName();
430                        if(database == null) {
431                            database = TSQLEnv.DEFAULT_DB_NAME;
432                        }
433                        schema = sqlenv.getDefaultSchemaName();
434                        if(schema == null) {
435                            schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
436                        }
437                    }
438                    boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor());
439                    boolean supportSchema = TSQLEnv.supportSchema(option.getVendor());
440                    StringBuilder builder = new StringBuilder();
441                    if (supportCatalog) {
442                        builder.append(database);
443                    }
444                    if (supportSchema) {
445                        if (builder.length() > 0) {
446                            builder.append(".");
447                        }
448                        builder.append(schema);
449                    }
450                    String group = builder.toString();
451                    SqlInfo sqlInfoItem = new SqlInfo();
452                    sqlInfoItem.setFileName(sqlInfo.getFileName());
453                    sqlInfoItem.setFilePath(sqlInfo.getFilePath());
454                    sqlInfoItem.setSql(sqlInfo.getSql());
455                    sqlInfoItem.setOriginIndex(0);
456                    sqlInfoItem.setOriginLineStart(0);
457                    sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1);
458                    sqlInfoItem.setIndex(0);
459                    sqlInfoItem.setLineStart(0);
460                    sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1);
461                    sqlInfoItem.setHash(fileHash);
462                    sqlInfoItem.setGroup(group);
463
464                    sqlInfoMap.get(fileHash).add(sqlInfoItem);
465                }
466            }
467        }
468
469        final TSQLEnv[] env = new TSQLEnv[]{sqlenv};
470        if (sqlenv == null) {
471                TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos);
472                        if (envs != null && envs.length > 0) {
473                                env[0] = envs[0];
474                        }
475        }
476        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
477                .newFixedThreadPool(option.getParallel() < sqlInfos.length
478                        ? option.getParallel()
479                        : sqlInfos.length);
480        final Map<SqlInfo, dataflow> dataflowMap = new ConcurrentHashMap<>();
481        try {
482            final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
483            for (int i = 0; i < sqlInfos.length; i++) {
484                final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length];
485                final SqlInfo item = sqlInfos[i];
486                sqlInfoCopy[i] = item;
487                final Option optionCopy = (Option) option.clone();
488                optionCopy.setStartId(5000000L * i);
489                optionCopy.setOutput(false);
490                Runnable task = new Runnable() {
491                    @Override
492                    public void run() {
493                        try {
494                            DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy);
495                            analyzer.setSqlEnv(env[0]);
496                            analyzer.generateDataFlow(withExtraInfo);
497                            dataflow dataflow = analyzer.getDataFlow();
498                            errorInfos.addAll(analyzer.getErrorMessages());
499                            dataflowMap.put(item, dataflow);
500                            hashSQLMap.putAll(analyzer.getHashSQLMap());
501                            analyzer.dispose();
502                        } finally {
503                            latch.countDown();
504                        }
505                    }
506                };
507                executor.submit(task);
508            }
509            latch.await();
510        } catch (Exception e) {
511            logger.error("execute task failed.", e);
512        }
513        executor.shutdown();
514
515        ModelBindingManager modelManager = new ModelBindingManager();
516        modelManager.setGlobalVendor(option.getVendor());
517        modelManager.setGlobalOption(option);
518        ModelBindingManager.set(modelManager);
519        this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length);
520        dataflowMap.clear();
521        if (dataflow != null && option.isOutput()) {
522            if (option.isTextFormat()) {
523                dataflowString = DataFlowAnalyzer.getTextOutput(dataflow);
524            } else {
525                try {
526                    dataflowString = XML2Model.saveXML(dataflow);
527                }catch (Exception e){
528                    logger.error("save dataflow as xml failed.", e);
529                    dataflowString = null;
530                }
531            }
532        }
533        ModelBindingManager.remove();
534        return dataflowString;
535    }
536
537    private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index,
538                               SqlInfo sqlInfo, Map queryObject) {
539        EDbVendor vendor = option.getVendor();
540        if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){
541            vendor = EDbVendor.valueOf(sqlInfo.getDbVendor());
542        }
543
544        boolean supportCatalog = TSQLEnv.supportCatalog(vendor);
545        boolean supportSchema = TSQLEnv.supportSchema(vendor);
546
547        String content = (String) queryObject.get("sourceCode");
548        if (SQLUtil.isEmpty(content)) {
549            return;
550        }
551
552        StringBuilder builder = new StringBuilder();
553        if (supportCatalog) {
554            String database = (String) queryObject.get("database");
555            if (database.indexOf(".") != -1) {
556                String delimitedChar = TSQLEnv.delimitedChar(vendor);
557                database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar;
558            }
559            builder.append(database);
560        }
561        if (supportSchema) {
562            String schema = (String) queryObject.get("schema");
563            if (schema.indexOf(".") != -1) {
564                String delimitedChar = TSQLEnv.delimitedChar(vendor);
565                schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar;
566            }
567            if (builder.length() > 0) {
568                builder.append(".");
569            }
570            builder.append(schema);
571        }
572        String group = builder.toString();
573        String sqlHash = SHA256.getMd5(content);
574        String hash = SHA256.getMd5(sqlHash);
575        if (!databaseMap.containsKey(sqlHash)) {
576            databaseMap.put(sqlHash,
577                    new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group));
578        }
579        TGSqlParser parser = new TGSqlParser(option.getVendor());
580        String delimiterChar = String.valueOf(parser.getFlexer().delimiterchar);
581        StringBuilder buffer = new StringBuilder(content);
582        if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) {
583            buffer.append("\n");
584        } else if(vendor == EDbVendor.dbvredshift
585                || vendor == EDbVendor.dbvgaussdb
586                || vendor == EDbVendor.dbvpostgresql
587                || vendor == EDbVendor.dbvmysql
588                || vendor == EDbVendor.dbvteradata){
589            buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n");
590        } else{
591            SQLUtil.endTrim(buffer);
592            buffer.append(";").append("\n");
593        }
594
595        int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1;
596        if (databaseMap.get(sqlHash).first.toString().length() == 0) {
597            lineStart = 0;
598        }
599        databaseMap.get(sqlHash).first.append(buffer.toString());
600        SqlInfo sqlInfoItem = new SqlInfo();
601        sqlInfoItem.setFileName(sqlInfo.getFileName());
602        sqlInfoItem.setFilePath(sqlInfo.getFilePath());
603        sqlInfoItem.setSql(buffer.toString());
604        sqlInfoItem.setOriginIndex(index);
605        sqlInfoItem.setOriginLineStart(0);
606        sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1);
607        sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement());
608        sqlInfoItem.setLineStart(lineStart);
609        sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1);
610        sqlInfoItem.setGroup(group);
611        sqlInfoItem.setHash(hash);
612
613        if (!sqlInfoMap.containsKey(hash)) {
614            sqlInfoMap.put(hash, new ArrayList<SqlInfo>());
615        }
616        sqlInfoMap.get(hash).add(sqlInfoItem);
617    }
618
619    @Override
620    public void dispose() {
621        ModelBindingManager.remove();
622    }
623
624    private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) {
625        return mergeDataFlows(dataflowMap.values(), startId);
626    }
627
628    public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) {
629        ModelBindingManager.setGlobalVendor(vendor);
630        try {
631            return mergeDataFlows(dataflows, 5000000L * dataflows.size());
632        } finally {
633            ModelBindingManager.removeGlobalVendor();
634        }
635    }
636
637    private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) {
638        dataflow mergeDataflow = new dataflow();
639
640        List<table> tableCopy = new ArrayList<table>();
641        List<table> viewCopy = new ArrayList<table>();
642        List<table> databaseCopy = new ArrayList<table>();
643        List<table> schemaCopy = new ArrayList<table>();
644        List<table> stageCopy = new ArrayList<table>();
645        List<table> dataSourceCopy = new ArrayList<table>();
646        List<table> streamCopy = new ArrayList<table>();
647        List<table> fileCopy = new ArrayList<table>();
648        List<table> variableCopy = new ArrayList<table>();
649        List<table> resultSetCopy = new ArrayList<table>();
650
651        List<process> processCopy = new ArrayList<process>();
652        List<relationship> relationshipCopy = new ArrayList<relationship>();
653        List<procedure> procedureCopy = new ArrayList<procedure>();
654        List<oraclePackage> packageCopy = new ArrayList<oraclePackage>();
655        List<error> errorCopy = new ArrayList<error>();
656
657        List<table> tables = new ArrayList<table>();
658        for (dataflow dataflow : dataflows) {
659            if (dataflow.getTables() != null) {
660                tableCopy.addAll(dataflow.getTables());
661            }
662            mergeDataflow.setTables(tableCopy);
663            if (dataflow.getViews() != null) {
664                viewCopy.addAll(dataflow.getViews());
665            }
666            mergeDataflow.setViews(viewCopy);
667            if (dataflow.getDatabases() != null) {
668                databaseCopy.addAll(dataflow.getDatabases());
669            }
670            mergeDataflow.setDatabases(databaseCopy);
671            if (dataflow.getSchemas() != null) {
672                schemaCopy.addAll(dataflow.getSchemas());
673            }
674            mergeDataflow.setSchemas(schemaCopy);
675            if (dataflow.getStages() != null) {
676                stageCopy.addAll(dataflow.getStages());
677            }
678            mergeDataflow.setStages(stageCopy);
679            if (dataflow.getDatasources() != null) {
680                dataSourceCopy.addAll(dataflow.getDatasources());
681            }
682            mergeDataflow.setDatasources(dataSourceCopy);
683            if (dataflow.getStreams() != null) {
684                streamCopy.addAll(dataflow.getStreams());
685            }
686            mergeDataflow.setStreams(streamCopy);
687            if (dataflow.getPaths() != null) {
688                fileCopy.addAll(dataflow.getPaths());
689            }
690            mergeDataflow.setPaths(fileCopy);
691            if (dataflow.getVariables() != null) {
692                variableCopy.addAll(dataflow.getVariables());
693            }
694            mergeDataflow.setVariables(variableCopy);
695            if (dataflow.getResultsets() != null) {
696                resultSetCopy.addAll(dataflow.getResultsets());
697            }
698            mergeDataflow.setResultsets(resultSetCopy);
699            if (dataflow.getProcesses() != null) {
700                processCopy.addAll(dataflow.getProcesses());
701            }
702            mergeDataflow.setProcesses(processCopy);
703            if (dataflow.getRelationships() != null) {
704                relationshipCopy.addAll(dataflow.getRelationships());
705            }
706            mergeDataflow.setRelationships(relationshipCopy);
707            if (dataflow.getProcedures() != null) {
708                procedureCopy.addAll(dataflow.getProcedures());
709            }
710            mergeDataflow.setProcedures(procedureCopy);
711            if (dataflow.getPackages() != null) {
712                packageCopy.addAll(dataflow.getPackages());
713            }
714            mergeDataflow.setPackages(packageCopy);
715            if (dataflow.getErrors() != null) {
716                errorCopy.addAll(dataflow.getErrors());
717            }
718            mergeDataflow.setErrors(errorCopy);
719
720            tables.addAll(dataflow.getTables());
721            tables.addAll(dataflow.getViews());
722            tables.addAll(dataflow.getDatabases());
723            tables.addAll(dataflow.getSchemas());
724            tables.addAll(dataflow.getStages());
725            tables.addAll(dataflow.getDatasources());
726            tables.addAll(dataflow.getStreams());
727            tables.addAll(dataflow.getPaths());
728            tables.addAll(dataflow.getResultsets());
729            tables.addAll(dataflow.getVariables());
730        }
731
732        Map<String, List<table>> tableMap = new HashMap<String, List<table>>();
733        Map<String, String> tableTypeMap = new HashMap<String, String>();
734        Map<String, String> tableIdMap = new HashMap<String, String>();
735
736        Map<String, List<column>> columnMap = new HashMap<String, List<column>>();
737        Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>();
738        Map<String, String> columnIdMap = new HashMap<String, String>();
739        Map<String, column> columnMergeIdMap = new HashMap<String, column>();
740
741        List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures());
742        if(mergeDataflow.getPackages()!=null){
743            for(oraclePackage pkg : mergeDataflow.getPackages()){
744                procedures.addAll(pkg.getProcedures());
745            }
746        }
747
748        Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet());
749
750        for (table table : tables) {
751            String qualifiedTableName = DlineageUtil.getQualifiedTableName(table);
752            String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName);
753
754            if (!tableMap.containsKey(tableFullName)) {
755                tableMap.put(tableFullName, new ArrayList<table>());
756            }
757
758            tableMap.get(tableFullName).add(table);
759
760            if (!tableTypeMap.containsKey(tableFullName)) {
761                tableTypeMap.put(tableFullName, table.getType());
762            } else if ("view".equals(table.getSubType())) {
763                tableTypeMap.put(tableFullName, table.getType());
764            } else if ("database".equals(table.getSubType())) {
765                tableTypeMap.put(tableFullName, table.getType());
766            } else if ("schema".equals(table.getSubType())) {
767                tableTypeMap.put(tableFullName, table.getType());
768            } else if ("stage".equals(table.getSubType())) {
769                tableTypeMap.put(tableFullName, table.getType());
770            } else if ("datasource".equals(table.getSubType())) {
771                tableTypeMap.put(tableFullName, table.getType());
772            } else if ("stream".equals(table.getSubType())) {
773                tableTypeMap.put(tableFullName, table.getType());
774            } else if ("file".equals(table.getSubType())) {
775                tableTypeMap.put(tableFullName, table.getType());
776            } else if ("table".equals(tableTypeMap.get(tableFullName))) {
777                tableTypeMap.put(tableFullName, table.getType());
778            } else if ("variable".equals(tableTypeMap.get(tableFullName))) {
779                tableTypeMap.put(tableFullName, table.getType());
780            }
781
782            if (table.getColumns() != null) {
783                if (!tableColumnMap.containsKey(tableFullName)) {
784                    tableColumnMap.put(tableFullName, new LinkedHashSet<String>());
785                }
786                for (column column : table.getColumns()) {
787                    String columnFullName = tableFullName + "."
788                            + DlineageUtil.getIdentifierNormalColumnName(column.getName());
789
790                    if (!columnMap.containsKey(columnFullName)) {
791                        columnMap.put(columnFullName, new ArrayList<column>());
792                        tableColumnMap.get(tableFullName).add(columnFullName);
793                    }
794
795                    columnMap.get(columnFullName).add(column);
796                }
797            }
798        }
799
800        Iterator<String> tableNameIter = tableMap.keySet().iterator();
801        while (tableNameIter.hasNext()) {
802            String tableName = tableNameIter.next();
803            List<table> tableList = tableMap.get(tableName);
804            table table;
805            if (tableList.size() > 1) {
806                table standardTable = tableList.get(0);
807                //Function允许重名,不做合并处理
808                if (standardTable.isFunction()) {
809                    continue;
810                }
811
812                String type = tableTypeMap.get(tableName);
813                table = new table();
814                table.setId(String.valueOf(++startId));
815                table.setServer(standardTable.getServer());
816                table.setDatabase(standardTable.getDatabase());
817                table.setSchema(standardTable.getSchema());
818                table.setName(standardTable.getName());
819                table.setDisplayName(standardTable.getDisplayName());
820                table.setParent(standardTable.getParent());
821                table.setColumns(new ArrayList<column>());
822                Set<String> processIds = new LinkedHashSet<String>();
823                for (int k = 0; k < tableList.size(); k++) {
824                    if (tableList.get(k).getProcessIds() != null) {
825                        processIds.addAll(tableList.get(k).getProcessIds());
826                    }
827                }
828                if (!processIds.isEmpty()) {
829                    table.setProcessIds(new ArrayList<String>(processIds));
830                }
831                table.setType(type);
832                for (table item : tableList) {
833                    if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) {
834                        if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) {
835                            table.appendCoordinate(item.getCoordinate());
836                        }
837                    } else if (!SQLUtil.isEmpty(item.getCoordinate())) {
838                        table.setCoordinate(item.getCoordinate());
839                    }
840
841                    if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) {
842                        table.setAlias(table.getAlias() + "," + item.getAlias());
843                    } else if (!SQLUtil.isEmpty(item.getAlias())) {
844                        table.setAlias(item.getAlias());
845                    }
846
847                    tableIdMap.put(item.getId(), table.getId());
848
849                    if (item.isView()) {
850                        mergeDataflow.getViews().remove(item);
851                    } else if (item.isDatabaseType()) {
852                        mergeDataflow.getDatabases().remove(item);
853                    } else if (item.isSchemaType()) {
854                        mergeDataflow.getSchemas().remove(item);
855                    } else if (item.isStage()) {
856                        mergeDataflow.getStages().remove(item);
857                    } else if (item.isDataSource()) {
858                        mergeDataflow.getDatasources().remove(item);
859                    } else if (item.isStream()) {
860                        mergeDataflow.getStreams().remove(item);
861                    } else if (item.isFile()) {
862                        mergeDataflow.getPaths().remove(item);
863                    } else if (item.isVariable()) {
864                        mergeDataflow.getVariables().remove(item);
865                    } else if (item.isTable()) {
866                        mergeDataflow.getTables().remove(item);
867                    } else if (item.isResultSet()) {
868                        mergeDataflow.getResultsets().remove(item);
869                    }
870                }
871
872                if (table.isView()) {
873                    mergeDataflow.getViews().add(table);
874                } else if (table.isDatabaseType()) {
875                    mergeDataflow.getDatabases().add(table);
876                } else if (table.isSchemaType()) {
877                    mergeDataflow.getSchemas().add(table);
878                } else if (table.isStage()) {
879                    mergeDataflow.getStages().add(table);
880                } else if (table.isDataSource()) {
881                    mergeDataflow.getDatasources().add(table);
882                } else if (table.isStream()) {
883                    mergeDataflow.getStreams().add(table);
884                } else if (table.isFile()) {
885                    mergeDataflow.getPaths().add(table);
886                } else if (table.isVariable()) {
887                    mergeDataflow.getVariables().add(table);
888                } else if (table.isResultSet()) {
889                    mergeDataflow.getResultsets().add(table);
890                } else {
891                    mergeDataflow.getTables().add(table);
892                }
893            } else {
894                table = tableList.get(0);
895            }
896
897            Set<String> columns = tableColumnMap.get(tableName);
898            Iterator<String> columnIter = columns.iterator();
899            List<column> mergeColumns = new ArrayList<column>();
900            while (columnIter.hasNext()) {
901                String columnName = columnIter.next();
902                List<column> columnList = columnMap.get(columnName);
903                List<column> functions = new ArrayList<column>();
904                for (column t : columnList) {
905                    if (Boolean.TRUE.toString().equals(t.getIsFunction())) {
906                        functions.add(t);
907                    }
908                }
909                if (functions != null && !functions.isEmpty()) {
910                    for (column function : functions) {
911                        mergeColumns.add(function);
912                        columnIdMap.put(function.getId(), function.getId());
913                        columnMergeIdMap.put(function.getId(), function);
914                    }
915
916                    columnList.removeAll(functions);
917                }
918                if (!columnList.isEmpty()) {
919                    column firstColumn = columnList.iterator().next();
920                    if (columnList.size() > 1) {
921                        column mergeColumn = new column();
922                        mergeColumn.setId(String.valueOf(++startId));
923                        mergeColumn.setName(firstColumn.getName());
924                        mergeColumn.setDisplayName(firstColumn.getDisplayName());
925                        mergeColumn.setSource(firstColumn.getSource());
926                        mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable());
927                        mergeColumns.add(mergeColumn);
928                        for (column item : columnList) {
929                            mergeColumn.appendCoordinate(item.getCoordinate());
930                            columnIdMap.put(item.getId(), mergeColumn.getId());
931                            //add by grq 2023.02.06 issue=I6DB5S
932                            if(item.getDataType() != null){
933                                mergeColumn.setDataType(item.getDataType());
934                            }
935                            if(item.isForeignKey() != null){
936                                mergeColumn.setForeignKey(item.isForeignKey());
937                            }
938                            if(item.isUnqiueKey() != null){
939                                mergeColumn.setUnqiueKey(item.isUnqiueKey());
940                            }
941                            if(item.isIndexKey() != null){
942                                mergeColumn.setIndexKey(item.isIndexKey());
943                            }
944                            if(item.isPrimaryKey() != null){
945                                mergeColumn.setPrimaryKey(item.isPrimaryKey());
946                            }
947                            //end by grq
948                        }
949                        columnMergeIdMap.put(mergeColumn.getId(), mergeColumn);
950                    } else {
951                        mergeColumns.add(firstColumn);
952                        columnIdMap.put(firstColumn.getId(), firstColumn.getId());
953                        columnMergeIdMap.put(firstColumn.getId(), firstColumn);
954                    }
955                }
956            }
957            table.setColumns(mergeColumns);
958        }
959
960        if (mergeDataflow.getRelationships() != null) {
961            Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>();
962            for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) {
963                relationship relation = mergeDataflow.getRelationships().get(i);
964                if(RelationshipType.call.name().equals(relation.getType())){
965                    targetColumn target = relation.getCaller();
966                    if (target == null) {
967                        continue;
968                    }
969                    if (target != null && tableIdMap.containsKey(target.getId())) {
970                        target.setId(tableIdMap.get(target.getId()));
971                    }
972
973                    List<sourceColumn> sources = relation.getCallees();
974                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
975                    if (sources != null) {
976                        for (sourceColumn source : sources) {
977                            if (tableIdMap.containsKey(source.getId())) {
978                                source.setId(tableIdMap.get(source.getId()));
979                            }
980                        }
981                        sourceSet.addAll(sources);
982                        relation.setCallees(new ArrayList<sourceColumn>(sourceSet));
983                    }
984
985                    String jsonString = JSON.toJSONString(relation).replaceAll("\"id\":\".+?\"", "");
986                    String key = SHA256.getMd5(jsonString);
987                    if (!mergeRelations.containsKey(key)) {
988                        mergeRelations.put(key, relation);
989                    }
990                }
991                else {
992                    targetColumn target = relation.getTarget();
993                    if (target == null) {
994                        continue;
995                    }
996                    if (target != null && tableIdMap.containsKey(target.getParent_id())) {
997                        target.setParent_id(tableIdMap.get(target.getParent_id()));
998                    }
999
1000                    if (columnIdMap.containsKey(target.getId())) {
1001                        target.setId(columnIdMap.get(target.getId()));
1002                        target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate());
1003                    }
1004
1005                    List<sourceColumn> sources = relation.getSources();
1006                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1007                    if (sources != null) {
1008                        for (sourceColumn source : sources) {
1009                            if (tableIdMap.containsKey(source.getParent_id())) {
1010                                source.setParent_id(tableIdMap.get(source.getParent_id()));
1011                            }
1012                            if (tableIdMap.containsKey(source.getSource_id())) {
1013                                source.setSource_id(tableIdMap.get(source.getSource_id()));
1014                            }
1015                            if (columnIdMap.containsKey(source.getId())) {
1016                                source.setId(columnIdMap.get(source.getId()));
1017                                source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate());
1018                            }
1019                        }
1020
1021                        sourceSet.addAll(sources);
1022                        relation.setSources(new ArrayList<sourceColumn>(sourceSet));
1023                    }
1024
1025                    String jsonString = JSON.toJSONString(relation).replaceAll("\"id\":\".+?\"", "");
1026                    String key = SHA256.getMd5(jsonString);
1027                    if (!mergeRelations.containsKey(key)) {
1028                        mergeRelations.put(key, relation);
1029                    }
1030                }
1031            }
1032
1033            mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values()));
1034        }
1035
1036        tableMap.clear();
1037        tableTypeMap.clear();
1038        tableIdMap.clear();
1039        columnMap.clear();
1040        tableColumnMap.clear();
1041        columnIdMap.clear();
1042        columnMergeIdMap.clear();
1043        tables.clear();
1044
1045        return mergeDataflow;
1046    }
1047
1048    @Override
1049    public synchronized dataflow getDataFlow() {
1050        if (dataflow != null) {
1051            return dataflow;
1052        } else if (dataflowString != null) {
1053            return XML2Model.loadXML(dataflow.class, dataflowString);
1054        }
1055        return null;
1056    }
1057
1058    @Override
1059    public Map<String, String> getHashSQLMap() {
1060        return hashSQLMap;
1061    }
1062
1063    public static void main(String[] args) throws Exception {
1064//        Option option = new Option();
1065//        option.setVendor(EDbVendor.dbvoracle);
1066//        option.setOutput(false);
1067//        ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\MRADM_PKG_ALL.sql")}, option);
1068//        analyzer.generateDataFlow();
1069//        dataflow dataflow = analyzer.getDataFlow();
1070        dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip"));
1071        //XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip"));
1072        dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow);
1073        dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle);
1074        System.out.println(XML2Model.saveXML(dataflow));
1075    }
1076}