001package gudusoft.gsqlparser.dlineage;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TGSqlParser;
006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener;
007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader;
008import gudusoft.gsqlparser.dlineage.dataflow.model.*;
009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate;
010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*;
011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser;
012import gudusoft.gsqlparser.dlineage.util.*;
013import gudusoft.gsqlparser.sqlenv.TSQLEnv;
014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
015import gudusoft.gsqlparser.util.IndexedLinkedHashMap;
016import gudusoft.gsqlparser.util.Logger;
017import gudusoft.gsqlparser.util.LoggerFactory;
018import gudusoft.gsqlparser.util.SQLUtil;
019import gudusoft.gsqlparser.util.json.JSON;
020
021import java.io.File;
022import java.util.*;
023import java.util.concurrent.*;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.stream.Collectors;
026
027public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer {
028    private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class);
029    private SqlInfo[] sqlInfos;
030    private Option option = new Option();
031    private TSQLEnv sqlenv = null;
032    private dataflow dataflow;
033    private String dataflowString;
034    private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>();
035    private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>();
036    private final Map<String, String> hashSQLMap = new HashMap();
037
038    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) {
039        this.sqlInfos = sqlInfos;
040        option.setVendor(dbVendor);
041        option.setSimpleOutput(simpleOutput);
042        ModelBindingManager.setGlobalOption(option);
043    }
044
045    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) {
046        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
047        for (int i = 0; i < sqlContents.length; i++) {
048            SqlInfo info = new SqlInfo();
049            info.setSql(sqlContents[i]);
050            info.setOriginIndex(0);
051            sqlInfos[i] = info;
052        }
053        option.setVendor(dbVendor);
054        option.setSimpleOutput(simpleOutput);
055                option.setDefaultServer(defaultServer);
056                option.setDefaultDatabase(defaultDatabase);
057                option.setDefaultSchema(defaltSchema);
058        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
059    }
060    
061    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) {
062        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
063        for (int i = 0; i < sqlContents.length; i++) {
064            SqlInfo info = new SqlInfo();
065            info.setSql(sqlContents[i]);
066            info.setOriginIndex(0);
067            sqlInfos[i] = info;
068        }
069        option.setVendor(dbVendor);
070        option.setSimpleOutput(simpleOutput);
071        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
072    }
073
074    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) {
075        this.sqlInfos = sqlInfos;
076        this.option = option;
077        ModelBindingManager.setGlobalOption(option);
078    }
079
080    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) {
081        SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length];
082        for (int i = 0; i < sqlFiles.length; i++) {
083            SqlInfo info = new SqlInfo();
084            info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
085            info.setFileName(sqlFiles[i].getName());
086            info.setFilePath(sqlFiles[i].getAbsolutePath());
087            info.setOriginIndex(0);
088            sqlInfos[i] = info;
089        }
090        this.sqlInfos = sqlInfos;
091        this.option = option;
092        ModelBindingManager.setGlobalOption(option);
093    }
094    
095    protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) {
096                List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>();
097                List sqlContents = (List) JSON.parseObject(json);
098                for (int j = 0; j < sqlContents.size(); j++) {
099                        Map sqlContent = (Map) sqlContents.get(j);
100                        String sql = (String) sqlContent.get("sql");
101                        String fileName = (String) sqlContent.get("fileName");
102                        String filePath = (String) sqlContent.get("filePath");
103                        if (sql != null && sql.trim().startsWith("{")) {
104                                if (sql.indexOf("createdBy") != -1) {
105                                        if (this.sqlenv == null) {
106                                                TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql);
107                                                if (sqlenvs != null) {
108                                                        this.sqlenv = sqlenvs[0];
109                                                }
110                                        }
111                        if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) {
112                                Map queryObject = (Map) JSON.parseObject(sql);
113                                        List querys = (List) queryObject.get("queries");
114                                        if (querys != null) {
115                                                for (int i = 0; i < querys.size(); i++) {
116                                                        Map object = (Map) querys.get(i);
117                                                        SqlInfo info = new SqlInfo();
118                                                        info.setSql(JSON.toJSONString(object));
119                                                        info.setFileName(fileName);
120                                                        info.setFilePath(filePath);
121                                                        info.setOriginIndex(i);
122                                                        sqlInfos.add(info);
123                                                }
124                                                queryObject.remove("queries");
125                                                SqlInfo info = new SqlInfo();
126                                                info.setSql(JSON.toJSONString(queryObject));
127                                                info.setFileName(fileName);
128                                                info.setFilePath(filePath);
129                                                info.setOriginIndex(querys.size());
130                                                sqlInfos.add(info);
131                                        } else {
132                                                SqlInfo info = new SqlInfo();
133                                                info.setSql(JSON.toJSONString(queryObject));
134                                                info.setFileName(fileName);
135                                                info.setFilePath(filePath);
136                                                info.setOriginIndex(0);
137                                                sqlInfos.add(info);
138                                        }
139                        }
140                        else if (sql.toLowerCase().indexOf("sqlflow") != -1) {
141                                                Map sqlflow = (Map) JSON.parseObject(sql);
142                                                List<Map> servers = (List<Map>) sqlflow.get("servers");
143                                                if (servers != null) {
144                                                        for (Map queryObject : servers) {
145                                                                String name = (String) queryObject.get("name");
146                                                                String dbVendor = (String) queryObject.get("dbVendor");
147                                                                List querys = (List) queryObject.get("queries");
148                                                                if (querys != null) {
149                                                                        for (int i = 0; i < querys.size(); i++) {
150                                                                                Map object = (Map) querys.get(i);
151                                                                                SqlInfo info = new SqlInfo();
152                                                                                info.setSql(JSON.toJSONString(object));
153                                                                                info.setFileName(fileName);
154                                                                                info.setFilePath(filePath);
155                                                                                info.setOriginIndex(i);
156                                                                                info.setDbVendor(dbVendor);
157                                                                                info.setServer(name);
158                                                                                sqlInfos.add(info);
159                                                                        }
160                                                                        queryObject.remove("queries");
161                                                                        SqlInfo info = new SqlInfo();
162                                                                        info.setSql(JSON.toJSONString(queryObject));
163                                                                        info.setFileName(fileName);
164                                                                        info.setFilePath(filePath);
165                                                                        info.setOriginIndex(querys.size());
166                                                                        info.setDbVendor(dbVendor);
167                                                                        info.setServer(filePath);
168                                                                        sqlInfos.add(info);
169                                                                } else {
170                                                                        SqlInfo info = new SqlInfo();
171                                                                        info.setSql(JSON.toJSONString(queryObject));
172                                                                        info.setFileName(fileName);
173                                                                        info.setFilePath(filePath);
174                                                                        info.setOriginIndex(0);
175                                                                        sqlInfos.add(info);
176                                                                }
177                                                        }
178                                                }
179                                        }
180                    }
181                        } else if (sql != null) {
182                                SqlInfo info = new SqlInfo();
183                                info.setSql(sql);
184                                info.setFileName(fileName);
185                                info.setFilePath(filePath);
186                                info.setOriginIndex(0);
187                                sqlInfos.add(info);
188                        }
189                }
190                return sqlInfos;
191        }
192
193    @Override
194    public boolean isIgnoreRecordSet() {
195        return option.isIgnoreRecordSet();
196    }
197
198    @Override
199    public void setIgnoreRecordSet(boolean ignoreRecordSet) {
200        option.setIgnoreRecordSet(ignoreRecordSet);
201    }
202
203    @Override
204    public boolean isSimpleShowTopSelectResultSet() {
205        return option.isSimpleShowTopSelectResultSet();
206    }
207
208    @Override
209    public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) {
210        option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet);
211    }
212
213    @Override
214    public boolean isSimpleShowFunction() {
215        return option.isSimpleShowFunction();
216    }
217
218    @Override
219    public void setSimpleShowFunction(boolean simpleShowFunction) {
220        option.setSimpleShowFunction(simpleShowFunction);
221    }
222
223    @Override
224    public boolean isShowJoin() {
225        return option.isShowJoin();
226    }
227
228    @Override
229    public void setShowJoin(boolean showJoin) {
230        option.setShowJoin(showJoin);
231    }
232
233    @Override
234    public boolean isShowImplicitSchema() {
235        return option.isShowImplicitSchema();
236    }
237
238    @Override
239    public void setShowImplicitSchema(boolean showImplicitSchema) {
240        option.setShowImplicitSchema(showImplicitSchema);
241    }
242
243    @Override
244    public boolean isShowConstantTable() {
245        return option.isShowConstantTable();
246    }
247
248    @Override
249    public void setShowConstantTable(boolean showConstantTable) {
250        option.setShowConstantTable(showConstantTable);
251    }
252
253    @Override
254    public boolean isShowCountTableColumn() {
255        return option.isShowCountTableColumn();
256    }
257
258    @Override
259    public void setShowCountTableColumn(boolean showCountTableColumn) {
260        option.setShowCountTableColumn(showCountTableColumn);
261    }
262
263    @Override
264    public boolean isTransform() {
265        return option.isTransform();
266    }
267
268    @Override
269    public void setTransform(boolean transform) {
270        option.setTransform(transform);
271        if (option.isTransformCoordinate()) {
272            option.setTransform(true);
273        }
274    }
275
276    @Override
277    public boolean isTransformCoordinate() {
278        return option.isTransformCoordinate();
279    }
280
281    @Override
282    public void setTransformCoordinate(boolean transformCoordinate) {
283        option.setTransformCoordinate(transformCoordinate);
284        if (transformCoordinate) {
285            option.setTransform(true);
286        }
287    }
288
289    @Override
290    public boolean isLinkOrphanColumnToFirstTable() {
291        return option.isLinkOrphanColumnToFirstTable();
292    }
293
294    @Override
295    public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) {
296        option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable);
297    }
298
299    @Override
300    public boolean isIgnoreCoordinate() {
301        return option.isIgnoreCoordinate();
302    }
303
304    @Override
305    public void setIgnoreCoordinate(boolean ignoreCoordinate) {
306        option.setIgnoreCoordinate(ignoreCoordinate);
307    }
308
309    @Override
310    public void setHandleListener(DataFlowHandleListener listener) {
311        option.setHandleListener(listener);
312    }
313
314    @Override
315    public void setSqlEnv(TSQLEnv sqlenv) {
316        this.sqlenv = sqlenv;
317    }
318
319    @Override
320    public void setOption(Option option) {
321        this.option = option;
322    }
323
324    @Override
325    public Option getOption() {
326        return option;
327    }
328
329    @Override
330    public List<ErrorInfo> getErrorMessages() {
331        return errorInfos;
332    }
333
334    @Override
335    public synchronized String generateSqlInfos() {
336        return JSON.toJSONString(sqlInfoMap);
337    }
338
339    @Override
340    public synchronized String generateDataFlow() {
341        return generateDataFlow(false);
342    }
343
344    @Override
345    public Map<String, List<SqlInfo>> getSqlInfos() {
346        return sqlInfoMap;
347    }
348
349    @Override
350    /**
351     * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo
352     */
353    public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) {
354        if (start == null || end == null) {
355            throw new IllegalArgumentException("Coordinate can't be null.");
356        }
357
358        String hashCode = start.getHashCode();
359
360        if (hashCode == null) {
361            throw new IllegalArgumentException("Coordinate hashcode can't be null.");
362        }
363
364        int dbObjectStartLine = (int) start.getX() - 1;
365        int dbObjectStarColumn = (int) start.getY() - 1;
366        int dbObjectEndLine = (int) end.getX() - 1;
367        int dbObjectEndColumn = (int) end.getY() - 1;
368        List<SqlInfo> sqlInfoList;
369        if (hashCode.matches("\\d+")) {
370            sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode));
371        } else {
372            sqlInfoList = sqlInfoMap.get(hashCode);
373        }
374        for (int j = 0; j < sqlInfoList.size(); j++) {
375            SqlInfo sqlInfo = sqlInfoList.get(j);
376            int startLine = sqlInfo.getLineStart();
377            int endLine = sqlInfo.getLineEnd();
378            if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) {
379                DbObjectPosition position = new DbObjectPosition();
380                position.setFile(sqlInfo.getFileName());
381                position.setFilePath(sqlInfo.getFilePath());
382                position.setSql(sqlInfo.getSql());
383                position.setIndex(sqlInfo.getOriginIndex());
384                List<Pair<Integer, Integer>> positions = position.getPositions();
385                positions.add(new Pair<Integer, Integer>(
386                        dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1));
387                positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1,
388                        dbObjectEndColumn + 1));
389                return position;
390            }
391        }
392        return null;
393    }
394
395    @Override
396    public synchronized String generateDataFlow(final boolean withExtraInfo) {
397        sqlInfoMap.clear();
398        errorInfos.clear();
399        Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>();
400        for (int i = 0; i < sqlInfos.length; i++) {
401            SqlInfo sqlInfo = sqlInfos[i];
402                        if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) {
403                                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath()));
404                        }
405            if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) {
406                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName()));
407            }
408            if (sqlInfo == null || sqlInfo.getSql() == null) {
409                sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>());
410                continue;
411            }
412            String sql = sqlInfo.getSql();
413            if (sql != null && sql.trim().startsWith("{")) {
414                if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) {
415                    String hash = SHA256.getMd5(sql);
416                    String fileHash = SHA256.getMd5(hash);
417                    if (!sqlInfoMap.containsKey(fileHash)) {
418                        sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
419                        sqlInfoMap.get(fileHash).add(sqlInfo);
420                    }
421                } else {
422                    Map queryObject = (Map) JSON.parseObject(sql);
423                    appendSqlInfo(databaseMap, i, sqlInfo, queryObject);
424                }
425            } else {
426                String content = sql;
427                String hash = SHA256.getMd5(content);
428                String fileHash = SHA256.getMd5(hash);
429                if (!sqlInfoMap.containsKey(fileHash)) {
430                    sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
431
432                    String database = TSQLEnv.DEFAULT_DB_NAME;
433                    String schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
434                    if(sqlenv!=null) {
435                        database = sqlenv.getDefaultCatalogName();
436                        if(database == null) {
437                            database = TSQLEnv.DEFAULT_DB_NAME;
438                        }
439                        schema = sqlenv.getDefaultSchemaName();
440                        if(schema == null) {
441                            schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
442                        }
443                    }
444                    boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor());
445                    boolean supportSchema = TSQLEnv.supportSchema(option.getVendor());
446                    StringBuilder builder = new StringBuilder();
447                    if (supportCatalog) {
448                        builder.append(database);
449                    }
450                    if (supportSchema) {
451                        if (builder.length() > 0) {
452                            builder.append(".");
453                        }
454                        builder.append(schema);
455                    }
456                    String group = builder.toString();
457                    SqlInfo sqlInfoItem = new SqlInfo();
458                    sqlInfoItem.setFileName(sqlInfo.getFileName());
459                    sqlInfoItem.setFilePath(sqlInfo.getFilePath());
460                    sqlInfoItem.setSql(sqlInfo.getSql());
461                    sqlInfoItem.setOriginIndex(0);
462                    sqlInfoItem.setOriginLineStart(0);
463                    sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1);
464                    sqlInfoItem.setIndex(0);
465                    sqlInfoItem.setLineStart(0);
466                    sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1);
467                    sqlInfoItem.setHash(fileHash);
468                    sqlInfoItem.setGroup(group);
469
470                    sqlInfoMap.get(fileHash).add(sqlInfoItem);
471                }
472            }
473        }
474
475        final TSQLEnv[] env = new TSQLEnv[]{sqlenv};
476        if (sqlenv == null) {
477                TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos);
478                        if (envs != null && envs.length > 0) {
479                                env[0] = envs[0];
480                        }
481        }
482        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
483                .newFixedThreadPool(option.getParallel() < sqlInfos.length
484                        ? option.getParallel()
485                        : sqlInfos.length);
486        final Map<SqlInfo, dataflow> dataflowMap = new ConcurrentHashMap<>();
487        try {
488            final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
489            for (int i = 0; i < sqlInfos.length; i++) {
490                final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length];
491                final SqlInfo item = sqlInfos[i];
492                sqlInfoCopy[i] = item;
493                final Option optionCopy = (Option) option.clone();
494                optionCopy.setStartId(5000000L * i);
495                optionCopy.setOutput(false);
496                final int index = i;
497                Runnable task = new Runnable() {
498                    @Override
499                    public void run() {
500                        try {
501                            DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy);
502                            analyzer.setSqlEnv(env[0]);
503                            analyzer.generateDataFlow(withExtraInfo);
504                            dataflow dataflow = analyzer.getDataFlow();
505                            logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size());
506                            errorInfos.addAll(analyzer.getErrorMessages());
507                            dataflowMap.put(item, dataflow);
508                            hashSQLMap.putAll(analyzer.getHashSQLMap());
509                            analyzer.dispose();
510                        }
511                        catch (Exception e) {
512                            logger.error("analyze sqlinfo[" + index + "] failed.", e);
513                        }
514                        finally {
515                            latch.countDown();
516                        }
517                    }
518                };
519                executor.submit(task);
520            }
521            latch.await();
522        } catch (Exception e) {
523            logger.error("execute task failed.", e);
524        }
525        executor.shutdown();
526
527        ModelBindingManager modelManager = new ModelBindingManager();
528        modelManager.setGlobalVendor(option.getVendor());
529        modelManager.setGlobalOption(option);
530        ModelBindingManager.set(modelManager);
531        this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length);
532        if (this.dataflow != null) {
533            logger.info("merge " + dataflowMap.size() + " dataflows done, relation count: " + dataflow.getRelationships().size());
534        }
535        dataflowMap.clear();
536        if (dataflow != null && option.isOutput()) {
537            if (option.isTextFormat()) {
538                dataflowString = DataFlowAnalyzer.getTextOutput(dataflow);
539            } else {
540                try {
541                    dataflowString = XML2Model.saveXML(dataflow);
542                }catch (Exception e){
543                    logger.error("save dataflow as xml failed.", e);
544                    dataflowString = null;
545                }
546            }
547        }
548        ModelBindingManager.remove();
549        return dataflowString;
550    }
551
552    private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index,
553                               SqlInfo sqlInfo, Map queryObject) {
554        EDbVendor vendor = option.getVendor();
555        if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){
556            vendor = EDbVendor.valueOf(sqlInfo.getDbVendor());
557        }
558
559        boolean supportCatalog = TSQLEnv.supportCatalog(vendor);
560        boolean supportSchema = TSQLEnv.supportSchema(vendor);
561
562        String content = (String) queryObject.get("sourceCode");
563        if (SQLUtil.isEmpty(content)) {
564            return;
565        }
566
567        StringBuilder builder = new StringBuilder();
568        if (supportCatalog) {
569            String database = (String) queryObject.get("database");
570            if (database.indexOf(".") != -1) {
571                String delimitedChar = TSQLEnv.delimitedChar(vendor);
572                database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar;
573            }
574            builder.append(database);
575        }
576        if (supportSchema) {
577            String schema = (String) queryObject.get("schema");
578            if (schema.indexOf(".") != -1) {
579                String delimitedChar = TSQLEnv.delimitedChar(vendor);
580                schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar;
581            }
582            if (builder.length() > 0) {
583                builder.append(".");
584            }
585            builder.append(schema);
586        }
587        String group = builder.toString();
588        String sqlHash = SHA256.getMd5(content);
589        String hash = SHA256.getMd5(sqlHash);
590        if (!databaseMap.containsKey(sqlHash)) {
591            databaseMap.put(sqlHash,
592                    new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group));
593        }
594        TGSqlParser parser = new TGSqlParser(option.getVendor());
595        String delimiterChar = String.valueOf(parser.getDelimiterChar());
596        StringBuilder buffer = new StringBuilder(content);
597        if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) {
598            buffer.append("\n");
599        } else if(vendor == EDbVendor.dbvredshift
600                || vendor == EDbVendor.dbvgaussdb
601                || vendor == EDbVendor.dbvpostgresql
602                || vendor == EDbVendor.dbvmysql
603                || vendor == EDbVendor.dbvteradata){
604            buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n");
605        } else{
606            SQLUtil.endTrim(buffer);
607            buffer.append(";").append("\n");
608        }
609
610        int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1;
611        if (databaseMap.get(sqlHash).first.toString().length() == 0) {
612            lineStart = 0;
613        }
614        databaseMap.get(sqlHash).first.append(buffer.toString());
615        SqlInfo sqlInfoItem = new SqlInfo();
616        sqlInfoItem.setFileName(sqlInfo.getFileName());
617        sqlInfoItem.setFilePath(sqlInfo.getFilePath());
618        sqlInfoItem.setSql(buffer.toString());
619        sqlInfoItem.setOriginIndex(index);
620        sqlInfoItem.setOriginLineStart(0);
621        sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1);
622        sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement());
623        sqlInfoItem.setLineStart(lineStart);
624        sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1);
625        sqlInfoItem.setGroup(group);
626        sqlInfoItem.setHash(hash);
627
628        if (!sqlInfoMap.containsKey(hash)) {
629            sqlInfoMap.put(hash, new ArrayList<SqlInfo>());
630        }
631        sqlInfoMap.get(hash).add(sqlInfoItem);
632    }
633
634    @Override
635    public void dispose() {
636        ModelBindingManager.remove();
637    }
638
639    private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) {
640        return mergeDataFlows(dataflowMap.values(), startId);
641    }
642
643    public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) {
644        ModelBindingManager.setGlobalVendor(vendor);
645        try {
646            return mergeDataFlows(dataflows, 5000000L * dataflows.size());
647        } finally {
648            ModelBindingManager.removeGlobalVendor();
649        }
650    }
651
652    private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) {
653        dataflow mergeDataflow = new dataflow();
654
655        List<table> tableCopy = new ArrayList<table>();
656        List<table> viewCopy = new ArrayList<table>();
657        List<table> databaseCopy = new ArrayList<table>();
658        List<table> schemaCopy = new ArrayList<table>();
659        List<table> stageCopy = new ArrayList<table>();
660        List<table> dataSourceCopy = new ArrayList<table>();
661        List<table> streamCopy = new ArrayList<table>();
662        List<table> fileCopy = new ArrayList<table>();
663        List<table> variableCopy = new ArrayList<table>();
664        List<table> resultSetCopy = new ArrayList<table>();
665
666        List<process> processCopy = new ArrayList<process>();
667        List<relationship> relationshipCopy = new ArrayList<relationship>();
668        List<procedure> procedureCopy = new ArrayList<procedure>();
669        List<oraclePackage> packageCopy = new ArrayList<oraclePackage>();
670        List<error> errorCopy = new ArrayList<error>();
671
672        List<table> tables = new ArrayList<table>();
673        for (dataflow dataflow : dataflows) {
674            if (dataflow.getTables() != null) {
675                tableCopy.addAll(dataflow.getTables());
676            }
677            mergeDataflow.setTables(tableCopy);
678            if (dataflow.getViews() != null) {
679                viewCopy.addAll(dataflow.getViews());
680            }
681            mergeDataflow.setViews(viewCopy);
682            if (dataflow.getDatabases() != null) {
683                databaseCopy.addAll(dataflow.getDatabases());
684            }
685            mergeDataflow.setDatabases(databaseCopy);
686            if (dataflow.getSchemas() != null) {
687                schemaCopy.addAll(dataflow.getSchemas());
688            }
689            mergeDataflow.setSchemas(schemaCopy);
690            if (dataflow.getStages() != null) {
691                stageCopy.addAll(dataflow.getStages());
692            }
693            mergeDataflow.setStages(stageCopy);
694            if (dataflow.getDatasources() != null) {
695                dataSourceCopy.addAll(dataflow.getDatasources());
696            }
697            mergeDataflow.setDatasources(dataSourceCopy);
698            if (dataflow.getStreams() != null) {
699                streamCopy.addAll(dataflow.getStreams());
700            }
701            mergeDataflow.setStreams(streamCopy);
702            if (dataflow.getPaths() != null) {
703                fileCopy.addAll(dataflow.getPaths());
704            }
705            mergeDataflow.setPaths(fileCopy);
706            if (dataflow.getVariables() != null) {
707                variableCopy.addAll(dataflow.getVariables());
708            }
709            mergeDataflow.setVariables(variableCopy);
710            if (dataflow.getResultsets() != null) {
711                resultSetCopy.addAll(dataflow.getResultsets());
712            }
713            mergeDataflow.setResultsets(resultSetCopy);
714            if (dataflow.getProcesses() != null) {
715                processCopy.addAll(dataflow.getProcesses());
716            }
717            mergeDataflow.setProcesses(processCopy);
718            if (dataflow.getRelationships() != null) {
719                relationshipCopy.addAll(dataflow.getRelationships());
720            }
721            mergeDataflow.setRelationships(relationshipCopy);
722            if (dataflow.getProcedures() != null) {
723                procedureCopy.addAll(dataflow.getProcedures());
724            }
725            mergeDataflow.setProcedures(procedureCopy);
726            if (dataflow.getPackages() != null) {
727                packageCopy.addAll(dataflow.getPackages());
728            }
729            mergeDataflow.setPackages(packageCopy);
730            if (dataflow.getErrors() != null) {
731                errorCopy.addAll(dataflow.getErrors());
732            }
733            mergeDataflow.setErrors(errorCopy);
734
735            tables.addAll(dataflow.getTables());
736            tables.addAll(dataflow.getViews());
737            tables.addAll(dataflow.getDatabases());
738            tables.addAll(dataflow.getSchemas());
739            tables.addAll(dataflow.getStages());
740            tables.addAll(dataflow.getDatasources());
741            tables.addAll(dataflow.getStreams());
742            tables.addAll(dataflow.getPaths());
743            tables.addAll(dataflow.getResultsets());
744            tables.addAll(dataflow.getVariables());
745        }
746
747        Map<String, List<table>> tableMap = new HashMap<String, List<table>>();
748        Map<String, String> tableTypeMap = new HashMap<String, String>();
749        Map<String, String> tableIdMap = new HashMap<String, String>();
750
751        Map<String, List<column>> columnMap = new HashMap<String, List<column>>();
752        Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>();
753        Map<String, String> columnIdMap = new HashMap<String, String>();
754        Map<String, column> columnMergeIdMap = new HashMap<String, column>();
755
756        List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures());
757        if(mergeDataflow.getPackages()!=null){
758            for(oraclePackage pkg : mergeDataflow.getPackages()){
759                procedures.addAll(pkg.getProcedures());
760            }
761        }
762
763        Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet());
764
765        for (table table : tables) {
766            String qualifiedTableName = DlineageUtil.getQualifiedTableName(table);
767            String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName);
768
769            if (!tableMap.containsKey(tableFullName)) {
770                tableMap.put(tableFullName, new ArrayList<table>());
771            }
772
773            tableMap.get(tableFullName).add(table);
774
775            if (!tableTypeMap.containsKey(tableFullName)) {
776                tableTypeMap.put(tableFullName, table.getType());
777            } else if ("view".equals(table.getSubType())) {
778                tableTypeMap.put(tableFullName, table.getType());
779            } else if ("database".equals(table.getSubType())) {
780                tableTypeMap.put(tableFullName, table.getType());
781            } else if ("schema".equals(table.getSubType())) {
782                tableTypeMap.put(tableFullName, table.getType());
783            } else if ("stage".equals(table.getSubType())) {
784                tableTypeMap.put(tableFullName, table.getType());
785            } else if ("datasource".equals(table.getSubType())) {
786                tableTypeMap.put(tableFullName, table.getType());
787            } else if ("stream".equals(table.getSubType())) {
788                tableTypeMap.put(tableFullName, table.getType());
789            } else if ("file".equals(table.getSubType())) {
790                tableTypeMap.put(tableFullName, table.getType());
791            } else if ("table".equals(tableTypeMap.get(tableFullName))) {
792                tableTypeMap.put(tableFullName, table.getType());
793            } else if ("variable".equals(tableTypeMap.get(tableFullName))) {
794                tableTypeMap.put(tableFullName, table.getType());
795            }
796
797            if (table.getColumns() != null) {
798                if (!tableColumnMap.containsKey(tableFullName)) {
799                    tableColumnMap.put(tableFullName, new LinkedHashSet<String>());
800                }
801                for (column column : table.getColumns()) {
802                    String columnFullName = tableFullName + "."
803                            + DlineageUtil.getIdentifierNormalColumnName(column.getName());
804
805                    if (!columnMap.containsKey(columnFullName)) {
806                        columnMap.put(columnFullName, new ArrayList<column>());
807                        tableColumnMap.get(tableFullName).add(columnFullName);
808                    }
809
810                    columnMap.get(columnFullName).add(column);
811                }
812            }
813        }
814
815        Iterator<String> tableNameIter = tableMap.keySet().iterator();
816        while (tableNameIter.hasNext()) {
817            String tableName = tableNameIter.next();
818            List<table> tableList = tableMap.get(tableName);
819            table table;
820            if (tableList.size() > 1) {
821                table standardTable = tableList.get(0);
822                //Function允许重名,不做合并处理
823                if (standardTable.isFunction()) {
824                    continue;
825                }
826
827                String type = tableTypeMap.get(tableName);
828                table = new table();
829                table.setId(String.valueOf(++startId));
830                table.setServer(standardTable.getServer());
831                table.setDatabase(standardTable.getDatabase());
832                table.setSchema(standardTable.getSchema());
833                table.setName(standardTable.getName());
834                table.setDisplayName(standardTable.getDisplayName());
835                table.setParent(standardTable.getParent());
836                table.setColumns(new ArrayList<column>());
837                Set<String> processIds = new LinkedHashSet<String>();
838                for (int k = 0; k < tableList.size(); k++) {
839                    if (tableList.get(k).getProcessIds() != null) {
840                        processIds.addAll(tableList.get(k).getProcessIds());
841                    }
842                }
843                if (!processIds.isEmpty()) {
844                    table.setProcessIds(new ArrayList<String>(processIds));
845                }
846                table.setType(type);
847                for (table item : tableList) {
848                    if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) {
849                        if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) {
850                            table.appendCoordinate(item.getCoordinate());
851                        }
852                    } else if (!SQLUtil.isEmpty(item.getCoordinate())) {
853                        table.setCoordinate(item.getCoordinate());
854                    }
855
856                    if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) {
857                        table.setAlias(table.getAlias() + "," + item.getAlias());
858                    } else if (!SQLUtil.isEmpty(item.getAlias())) {
859                        table.setAlias(item.getAlias());
860                    }
861
862                    tableIdMap.put(item.getId(), table.getId());
863
864                    if (item.isView()) {
865                        mergeDataflow.getViews().remove(item);
866                    } else if (item.isDatabaseType()) {
867                        mergeDataflow.getDatabases().remove(item);
868                    } else if (item.isSchemaType()) {
869                        mergeDataflow.getSchemas().remove(item);
870                    } else if (item.isStage()) {
871                        mergeDataflow.getStages().remove(item);
872                    } else if (item.isDataSource()) {
873                        mergeDataflow.getDatasources().remove(item);
874                    } else if (item.isStream()) {
875                        mergeDataflow.getStreams().remove(item);
876                    } else if (item.isFile()) {
877                        mergeDataflow.getPaths().remove(item);
878                    } else if (item.isVariable()) {
879                        mergeDataflow.getVariables().remove(item);
880                    } else if (item.isTable()) {
881                        mergeDataflow.getTables().remove(item);
882                    } else if (item.isResultSet()) {
883                        mergeDataflow.getResultsets().remove(item);
884                    }
885                }
886
887                if (table.isView()) {
888                    mergeDataflow.getViews().add(table);
889                } else if (table.isDatabaseType()) {
890                    mergeDataflow.getDatabases().add(table);
891                } else if (table.isSchemaType()) {
892                    mergeDataflow.getSchemas().add(table);
893                } else if (table.isStage()) {
894                    mergeDataflow.getStages().add(table);
895                } else if (table.isDataSource()) {
896                    mergeDataflow.getDatasources().add(table);
897                } else if (table.isStream()) {
898                    mergeDataflow.getStreams().add(table);
899                } else if (table.isFile()) {
900                    mergeDataflow.getPaths().add(table);
901                } else if (table.isVariable()) {
902                    mergeDataflow.getVariables().add(table);
903                } else if (table.isResultSet()) {
904                    mergeDataflow.getResultsets().add(table);
905                } else {
906                    mergeDataflow.getTables().add(table);
907                }
908            } else {
909                table = tableList.get(0);
910            }
911
912            Set<String> columns = tableColumnMap.get(tableName);
913            Iterator<String> columnIter = columns.iterator();
914            List<column> mergeColumns = new ArrayList<column>();
915            while (columnIter.hasNext()) {
916                String columnName = columnIter.next();
917                List<column> columnList = columnMap.get(columnName);
918                List<column> functions = new ArrayList<column>();
919                for (column t : columnList) {
920                    if (Boolean.TRUE.toString().equals(t.getIsFunction())) {
921                        functions.add(t);
922                    }
923                }
924                if (functions != null && !functions.isEmpty()) {
925                    for (column function : functions) {
926                        mergeColumns.add(function);
927                        columnIdMap.put(function.getId(), function.getId());
928                        columnMergeIdMap.put(function.getId(), function);
929                    }
930
931                    columnList.removeAll(functions);
932                }
933                if (!columnList.isEmpty()) {
934                    column firstColumn = columnList.iterator().next();
935                    if (columnList.size() > 1) {
936                        column mergeColumn = new column();
937                        mergeColumn.setId(String.valueOf(++startId));
938                        mergeColumn.setName(firstColumn.getName());
939                        mergeColumn.setDisplayName(firstColumn.getDisplayName());
940                        mergeColumn.setSource(firstColumn.getSource());
941                        mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable());
942                        mergeColumns.add(mergeColumn);
943                        for (column item : columnList) {
944                            mergeColumn.appendCoordinate(item.getCoordinate());
945                            columnIdMap.put(item.getId(), mergeColumn.getId());
946                            //add by grq 2023.02.06 issue=I6DB5S
947                            if(item.getDataType() != null){
948                                mergeColumn.setDataType(item.getDataType());
949                            }
950                            if(item.isForeignKey() != null){
951                                mergeColumn.setForeignKey(item.isForeignKey());
952                            }
953                            if(item.isUnqiueKey() != null){
954                                mergeColumn.setUnqiueKey(item.isUnqiueKey());
955                            }
956                            if(item.isIndexKey() != null){
957                                mergeColumn.setIndexKey(item.isIndexKey());
958                            }
959                            if(item.isPrimaryKey() != null){
960                                mergeColumn.setPrimaryKey(item.isPrimaryKey());
961                            }
962                            //end by grq
963                        }
964                        columnMergeIdMap.put(mergeColumn.getId(), mergeColumn);
965                    } else {
966                        mergeColumns.add(firstColumn);
967                        columnIdMap.put(firstColumn.getId(), firstColumn.getId());
968                        columnMergeIdMap.put(firstColumn.getId(), firstColumn);
969                    }
970                }
971            }
972            table.setColumns(mergeColumns);
973        }
974
975        if (mergeDataflow.getRelationships() != null) {
976            Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>();
977            for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) {
978                relationship relation = mergeDataflow.getRelationships().get(i);
979                if(RelationshipType.call.name().equals(relation.getType())){
980                    targetColumn target = relation.getCaller();
981                    if (target == null) {
982                        continue;
983                    }
984                    if (target != null && tableIdMap.containsKey(target.getId())) {
985                        target.setId(tableIdMap.get(target.getId()));
986                    }
987
988                    List<sourceColumn> sources = relation.getCallees();
989                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
990                    if (sources != null) {
991                        for (sourceColumn source : sources) {
992                            if (tableIdMap.containsKey(source.getId())) {
993                                source.setId(tableIdMap.get(source.getId()));
994                            }
995                        }
996                        sourceSet.addAll(sources);
997                        relation.setCallees(new ArrayList<sourceColumn>(sourceSet));
998                    }
999
1000                    String jsonString = JSON.toJSONString(relation, true);
1001                    String key = SHA256.getMd5(jsonString);
1002                    if (!mergeRelations.containsKey(key)) {
1003                        mergeRelations.put(key, relation);
1004                    }
1005                }
1006                else {
1007                    targetColumn target = relation.getTarget();
1008                    if (target == null) {
1009                        continue;
1010                    }
1011                    if (target != null && tableIdMap.containsKey(target.getParent_id())) {
1012                        target.setParent_id(tableIdMap.get(target.getParent_id()));
1013                    }
1014
1015                    if (columnIdMap.containsKey(target.getId())) {
1016                        target.setId(columnIdMap.get(target.getId()));
1017                        target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate());
1018                    }
1019
1020                    List<sourceColumn> sources = relation.getSources();
1021                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1022                    if (sources != null) {
1023                        for (sourceColumn source : sources) {
1024                            if (tableIdMap.containsKey(source.getParent_id())) {
1025                                source.setParent_id(tableIdMap.get(source.getParent_id()));
1026                            }
1027                            if (tableIdMap.containsKey(source.getSource_id())) {
1028                                source.setSource_id(tableIdMap.get(source.getSource_id()));
1029                            }
1030                            if (columnIdMap.containsKey(source.getId())) {
1031                                source.setId(columnIdMap.get(source.getId()));
1032                                source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate());
1033                            }
1034                        }
1035
1036                        sourceSet.addAll(sources);
1037                        relation.setSources(new ArrayList<sourceColumn>(sourceSet));
1038                    }
1039
1040                    String jsonString = JSON.toJSONString(relation, true);
1041                    String key = SHA256.getMd5(jsonString);
1042                    if (!mergeRelations.containsKey(key)) {
1043                        mergeRelations.put(key, relation);
1044                    }
1045                }
1046            }
1047
1048            mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values()));
1049        }
1050
1051        tableMap.clear();
1052        tableTypeMap.clear();
1053        tableIdMap.clear();
1054        columnMap.clear();
1055        tableColumnMap.clear();
1056        columnIdMap.clear();
1057        columnMergeIdMap.clear();
1058        tables.clear();
1059
1060        return mergeDataflow;
1061    }
1062
1063    @Override
1064    public synchronized dataflow getDataFlow() {
1065        if (dataflow != null) {
1066            return dataflow;
1067        } else if (dataflowString != null) {
1068            return XML2Model.loadXML(dataflow.class, dataflowString);
1069        }
1070        return null;
1071    }
1072
1073    @Override
1074    public Map<String, String> getHashSQLMap() {
1075        return hashSQLMap;
1076    }
1077
1078    public static void main(String[] args) throws Exception {
1079//        Option option = new Option();
1080//        option.setVendor(EDbVendor.dbvoracle);
1081//        option.setOutput(false);
1082//        ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\MRADM_PKG_ALL.sql")}, option);
1083//        analyzer.generateDataFlow();
1084//        dataflow dataflow = analyzer.getDataFlow();
1085        dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip"));
1086        //XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip"));
1087        dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow);
1088        dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle);
1089        System.out.println(XML2Model.saveXML(dataflow));
1090    }
1091}