001package gudusoft.gsqlparser.dlineage;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TGSqlParser;
006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener;
007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader;
008import gudusoft.gsqlparser.dlineage.dataflow.model.*;
009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate;
010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*;
011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser;
012import gudusoft.gsqlparser.dlineage.util.*;
013import gudusoft.gsqlparser.sqlenv.TSQLEnv;
014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
015import gudusoft.gsqlparser.util.IndexedLinkedHashMap;
016import gudusoft.gsqlparser.util.Logger;
017import gudusoft.gsqlparser.util.LoggerFactory;
018import gudusoft.gsqlparser.util.SQLUtil;
019import gudusoft.gsqlparser.util.json.JSON;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.*;
024import java.util.concurrent.*;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.stream.Collectors;
027
028public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer {
029    private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class);
030    private SqlInfo[] sqlInfos;
031    private Option option = new Option();
032    private TSQLEnv sqlenv = null;
033    private dataflow dataflow;
034    private String dataflowString;
035    private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>();
036    private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>();
037    private final Map<String, String> hashSQLMap = new HashMap();
038
039    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) {
040        this.sqlInfos = sqlInfos;
041        option.setVendor(dbVendor);
042        option.setSimpleOutput(simpleOutput);
043        ModelBindingManager.setGlobalOption(option);
044    }
045
046    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) {
047        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
048        for (int i = 0; i < sqlContents.length; i++) {
049            SqlInfo info = new SqlInfo();
050            info.setSql(sqlContents[i]);
051            info.setOriginIndex(0);
052            sqlInfos[i] = info;
053        }
054        option.setVendor(dbVendor);
055        option.setSimpleOutput(simpleOutput);
056                option.setDefaultServer(defaultServer);
057                option.setDefaultDatabase(defaultDatabase);
058                option.setDefaultSchema(defaltSchema);
059        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
060    }
061    
062    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) {
063        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
064        for (int i = 0; i < sqlContents.length; i++) {
065            SqlInfo info = new SqlInfo();
066            info.setSql(sqlContents[i]);
067            info.setOriginIndex(0);
068            sqlInfos[i] = info;
069        }
070        option.setVendor(dbVendor);
071        option.setSimpleOutput(simpleOutput);
072        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
073    }
074
075    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) {
076        this.sqlInfos = sqlInfos;
077        this.option = option;
078        ModelBindingManager.setGlobalOption(option);
079    }
080
081    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) {
082        SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length];
083        for (int i = 0; i < sqlFiles.length; i++) {
084            SqlInfo info = new SqlInfo();
085            info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
086            info.setFileName(sqlFiles[i].getName());
087            info.setFilePath(sqlFiles[i].getAbsolutePath());
088            info.setOriginIndex(0);
089            sqlInfos[i] = info;
090        }
091        this.sqlInfos = sqlInfos;
092        this.option = option;
093        ModelBindingManager.setGlobalOption(option);
094    }
095    
096    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, File splitDir) {
097        List<SqlInfo> sqlInfoList = new ArrayList<SqlInfo>();
098        for (int i = 0; i < sqlFiles.length; i++) {
099            try {
100                // Ensure split directory exists
101                if (!splitDir.exists() && !splitDir.mkdirs()) {
102                    throw new IOException("Failed to create split directory: " + splitDir.getAbsolutePath());
103                }
104                
105                // Split the file if it's large
106                List<File> splitFiles = gudusoft.gsqlparser.util.FileSplitter.splitFile(sqlFiles[i], splitDir, splitSizeMB, option.getVendor());
107                
108                if (splitFiles.isEmpty()) {
109                    // If no split files were created, process the original file
110                    SqlInfo info = new SqlInfo();
111                    info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
112                    info.setFileName(sqlFiles[i].getName());
113                    info.setFilePath(sqlFiles[i].getAbsolutePath());
114                    info.setOriginIndex(0);
115                    sqlInfoList.add(info);
116                } else {
117                    // Process each split file
118                    for (File splitFile : splitFiles) {
119                        SqlInfo info = new SqlInfo();
120                        info.setSql(SQLUtil.getFileContent(splitFile));
121                        info.setFileName(splitFile.getName());
122                        info.setFilePath(splitFile.getAbsolutePath());
123                        info.setOriginIndex(0);
124                        sqlInfoList.add(info);
125                    }
126                }
127            } catch (Exception e) {
128                sqlInfoList.clear();
129                logger.error("Error splitting file: " + sqlFiles[i].getAbsolutePath(), e);
130                // Fallback to original file if splitting fails
131                SqlInfo info = new SqlInfo();
132                info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
133                info.setFileName(sqlFiles[i].getName());
134                info.setFilePath(sqlFiles[i].getAbsolutePath());
135                info.setOriginIndex(0);
136                sqlInfoList.add(info);
137            }
138        }
139        this.sqlInfos = sqlInfoList.toArray(new SqlInfo[0]);
140        this.option = option;
141        ModelBindingManager.setGlobalOption(option);
142    }
143    
144    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, String splitDirPath) {
145        this(sqlFiles, option, splitSizeMB, new File(splitDirPath));
146    }
147    
148    protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) {
149                List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>();
150                List sqlContents = (List) JSON.parseObject(json);
151                for (int j = 0; j < sqlContents.size(); j++) {
152                        Map sqlContent = (Map) sqlContents.get(j);
153                        String sql = (String) sqlContent.get("sql");
154                        String fileName = (String) sqlContent.get("fileName");
155                        String filePath = (String) sqlContent.get("filePath");
156                        if (sql != null && sql.trim().startsWith("{")) {
157                                if (sql.indexOf("createdBy") != -1) {
158                                        if (this.sqlenv == null) {
159                                                TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql);
160                                                if (sqlenvs != null) {
161                                                        this.sqlenv = sqlenvs[0];
162                                                }
163                                        }
164                        if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) {
165                                Map queryObject = (Map) JSON.parseObject(sql);
166                                        List querys = (List) queryObject.get("queries");
167                                        if (querys != null) {
168                                                for (int i = 0; i < querys.size(); i++) {
169                                                        Map object = (Map) querys.get(i);
170                                                        SqlInfo info = new SqlInfo();
171                                                        info.setSql(JSON.toJSONString(object));
172                                                        info.setFileName(fileName);
173                                                        info.setFilePath(filePath);
174                                                        info.setOriginIndex(i);
175                                                        sqlInfos.add(info);
176                                                }
177                                                queryObject.remove("queries");
178                                                SqlInfo info = new SqlInfo();
179                                                info.setSql(JSON.toJSONString(queryObject));
180                                                info.setFileName(fileName);
181                                                info.setFilePath(filePath);
182                                                info.setOriginIndex(querys.size());
183                                                sqlInfos.add(info);
184                                        } else {
185                                                SqlInfo info = new SqlInfo();
186                                                info.setSql(JSON.toJSONString(queryObject));
187                                                info.setFileName(fileName);
188                                                info.setFilePath(filePath);
189                                                info.setOriginIndex(0);
190                                                sqlInfos.add(info);
191                                        }
192                        }
193                        else if (sql.toLowerCase().indexOf("sqlflow") != -1) {
194                                                Map sqlflow = (Map) JSON.parseObject(sql);
195                                                List<Map> servers = (List<Map>) sqlflow.get("servers");
196                                                if (servers != null) {
197                                                        for (Map queryObject : servers) {
198                                                                String name = (String) queryObject.get("name");
199                                                                String dbVendor = (String) queryObject.get("dbVendor");
200                                                                List querys = (List) queryObject.get("queries");
201                                                                if (querys != null) {
202                                                                        for (int i = 0; i < querys.size(); i++) {
203                                                                                Map object = (Map) querys.get(i);
204                                                                                SqlInfo info = new SqlInfo();
205                                                                                info.setSql(JSON.toJSONString(object));
206                                                                                info.setFileName(fileName);
207                                                                                info.setFilePath(filePath);
208                                                                                info.setOriginIndex(i);
209                                                                                info.setDbVendor(dbVendor);
210                                                                                info.setServer(name);
211                                                                                sqlInfos.add(info);
212                                                                        }
213                                                                        queryObject.remove("queries");
214                                                                        SqlInfo info = new SqlInfo();
215                                                                        info.setSql(JSON.toJSONString(queryObject));
216                                                                        info.setFileName(fileName);
217                                                                        info.setFilePath(filePath);
218                                                                        info.setOriginIndex(querys.size());
219                                                                        info.setDbVendor(dbVendor);
220                                                                        info.setServer(filePath);
221                                                                        sqlInfos.add(info);
222                                                                } else {
223                                                                        SqlInfo info = new SqlInfo();
224                                                                        info.setSql(JSON.toJSONString(queryObject));
225                                                                        info.setFileName(fileName);
226                                                                        info.setFilePath(filePath);
227                                                                        info.setOriginIndex(0);
228                                                                        sqlInfos.add(info);
229                                                                }
230                                                        }
231                                                }
232                                        }
233                    }
234                        } else if (sql != null) {
235                                SqlInfo info = new SqlInfo();
236                                info.setSql(sql);
237                                info.setFileName(fileName);
238                                info.setFilePath(filePath);
239                                info.setOriginIndex(0);
240                                sqlInfos.add(info);
241                        }
242                }
243                return sqlInfos;
244        }
245
246    @Override
247    public boolean isIgnoreRecordSet() {
248        return option.isIgnoreRecordSet();
249    }
250
251    @Override
252    public void setIgnoreRecordSet(boolean ignoreRecordSet) {
253        option.setIgnoreRecordSet(ignoreRecordSet);
254    }
255
256    @Override
257    public boolean isSimpleShowTopSelectResultSet() {
258        return option.isSimpleShowTopSelectResultSet();
259    }
260
261    @Override
262    public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) {
263        option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet);
264    }
265
266    @Override
267    public boolean isSimpleShowFunction() {
268        return option.isSimpleShowFunction();
269    }
270
271    @Override
272    public void setSimpleShowFunction(boolean simpleShowFunction) {
273        option.setSimpleShowFunction(simpleShowFunction);
274    }
275
276    @Override
277    public boolean isShowJoin() {
278        return option.isShowJoin();
279    }
280
281    @Override
282    public void setShowJoin(boolean showJoin) {
283        option.setShowJoin(showJoin);
284    }
285
286    @Override
287    public boolean isShowImplicitSchema() {
288        return option.isShowImplicitSchema();
289    }
290
291    @Override
292    public void setShowImplicitSchema(boolean showImplicitSchema) {
293        option.setShowImplicitSchema(showImplicitSchema);
294    }
295
296    @Override
297    public boolean isShowConstantTable() {
298        return option.isShowConstantTable();
299    }
300
301    @Override
302    public void setShowConstantTable(boolean showConstantTable) {
303        option.setShowConstantTable(showConstantTable);
304    }
305
306    @Override
307    public boolean isShowCountTableColumn() {
308        return option.isShowCountTableColumn();
309    }
310
311    @Override
312    public void setShowCountTableColumn(boolean showCountTableColumn) {
313        option.setShowCountTableColumn(showCountTableColumn);
314    }
315
316    @Override
317    public boolean isTransform() {
318        return option.isTransform();
319    }
320
321    @Override
322    public void setTransform(boolean transform) {
323        option.setTransform(transform);
324        if (option.isTransformCoordinate()) {
325            option.setTransform(true);
326        }
327    }
328
329    @Override
330    public boolean isTransformCoordinate() {
331        return option.isTransformCoordinate();
332    }
333
334    @Override
335    public void setTransformCoordinate(boolean transformCoordinate) {
336        option.setTransformCoordinate(transformCoordinate);
337        if (transformCoordinate) {
338            option.setTransform(true);
339        }
340    }
341
342    @Override
343    public boolean isLinkOrphanColumnToFirstTable() {
344        return option.isLinkOrphanColumnToFirstTable();
345    }
346
347    @Override
348    public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) {
349        option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable);
350    }
351
352    @Override
353    public boolean isIgnoreCoordinate() {
354        return option.isIgnoreCoordinate();
355    }
356
357    @Override
358    public void setIgnoreCoordinate(boolean ignoreCoordinate) {
359        option.setIgnoreCoordinate(ignoreCoordinate);
360    }
361
362    @Override
363    public void setHandleListener(DataFlowHandleListener listener) {
364        option.setHandleListener(listener);
365    }
366
367    @Override
368    public void setSqlEnv(TSQLEnv sqlenv) {
369        this.sqlenv = sqlenv;
370    }
371
372    @Override
373    public void setOption(Option option) {
374        this.option = option;
375    }
376
377    @Override
378    public Option getOption() {
379        return option;
380    }
381
382    @Override
383    public List<ErrorInfo> getErrorMessages() {
384        return errorInfos;
385    }
386
387    @Override
388    public synchronized String generateSqlInfos() {
389        return JSON.toJSONString(sqlInfoMap);
390    }
391
392    @Override
393    public synchronized String generateDataFlow() {
394        return generateDataFlow(false);
395    }
396
397    @Override
398    public Map<String, List<SqlInfo>> getSqlInfos() {
399        return sqlInfoMap;
400    }
401
402    @Override
403    /**
404     * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo
405     */
406    public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) {
407        if (start == null || end == null) {
408            throw new IllegalArgumentException("Coordinate can't be null.");
409        }
410
411        String hashCode = start.getHashCode();
412
413        if (hashCode == null) {
414            throw new IllegalArgumentException("Coordinate hashcode can't be null.");
415        }
416
417        int dbObjectStartLine = (int) start.getX() - 1;
418        int dbObjectStarColumn = (int) start.getY() - 1;
419        int dbObjectEndLine = (int) end.getX() - 1;
420        int dbObjectEndColumn = (int) end.getY() - 1;
421        List<SqlInfo> sqlInfoList;
422        if (hashCode.matches("\\d+")) {
423            sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode));
424        } else {
425            sqlInfoList = sqlInfoMap.get(hashCode);
426        }
427        for (int j = 0; j < sqlInfoList.size(); j++) {
428            SqlInfo sqlInfo = sqlInfoList.get(j);
429            int startLine = sqlInfo.getLineStart();
430            int endLine = sqlInfo.getLineEnd();
431            if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) {
432                DbObjectPosition position = new DbObjectPosition();
433                position.setFile(sqlInfo.getFileName());
434                position.setFilePath(sqlInfo.getFilePath());
435                position.setSql(sqlInfo.getSql());
436                position.setIndex(sqlInfo.getOriginIndex());
437                List<Pair<Integer, Integer>> positions = position.getPositions();
438                positions.add(new Pair<Integer, Integer>(
439                        dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1));
440                positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1,
441                        dbObjectEndColumn + 1));
442                return position;
443            }
444        }
445        return null;
446    }
447
448    @Override
449    public synchronized String generateDataFlow(final boolean withExtraInfo) {
450        return generateDataFlow(withExtraInfo, true);
451    }
452    
453    public synchronized String generateDataFlow(final boolean withExtraInfo, boolean useSaveMemoryMode) {
454        sqlInfoMap.clear();
455        errorInfos.clear();
456        Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>();
457        for (int i = 0; i < sqlInfos.length; i++) {
458            SqlInfo sqlInfo = sqlInfos[i];
459                        if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) {
460                                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath()));
461                        }
462            if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) {
463                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName()));
464            }
465            if (sqlInfo == null || sqlInfo.getSql() == null) {
466                sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>());
467                continue;
468            }
469            String sql = sqlInfo.getSql();
470            if (sql != null && sql.trim().startsWith("{")) {
471                if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) {
472                    String hash = SHA256.getMd5(sql);
473                    String fileHash = SHA256.getMd5(hash);
474                    if (!sqlInfoMap.containsKey(fileHash)) {
475                        sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
476                        sqlInfoMap.get(fileHash).add(sqlInfo);
477                    }
478                } else {
479                    Map queryObject = (Map) JSON.parseObject(sql);
480                    appendSqlInfo(databaseMap, i, sqlInfo, queryObject);
481                }
482            } else {
483                String content = sql;
484                String hash = SHA256.getMd5(content);
485                String fileHash = SHA256.getMd5(hash);
486                if (!sqlInfoMap.containsKey(fileHash)) {
487                    sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
488
489                    String database = TSQLEnv.DEFAULT_DB_NAME;
490                    String schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
491                    if(sqlenv!=null) {
492                        database = sqlenv.getDefaultCatalogName();
493                        if(database == null) {
494                            database = TSQLEnv.DEFAULT_DB_NAME;
495                        }
496                        schema = sqlenv.getDefaultSchemaName();
497                        if(schema == null) {
498                            schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
499                        }
500                    }
501                    boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor());
502                    boolean supportSchema = TSQLEnv.supportSchema(option.getVendor());
503                    StringBuilder builder = new StringBuilder();
504                    if (supportCatalog) {
505                        builder.append(database);
506                    }
507                    if (supportSchema) {
508                        if (builder.length() > 0) {
509                            builder.append(".");
510                        }
511                        builder.append(schema);
512                    }
513                    String group = builder.toString();
514                    SqlInfo sqlInfoItem = new SqlInfo();
515                    sqlInfoItem.setFileName(sqlInfo.getFileName());
516                    sqlInfoItem.setFilePath(sqlInfo.getFilePath());
517                    sqlInfoItem.setSql(sqlInfo.getSql());
518                    sqlInfoItem.setOriginIndex(0);
519                    sqlInfoItem.setOriginLineStart(0);
520                    sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1);
521                    sqlInfoItem.setIndex(0);
522                    sqlInfoItem.setLineStart(0);
523                    sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1);
524                    sqlInfoItem.setHash(fileHash);
525                    sqlInfoItem.setGroup(group);
526
527                    sqlInfoMap.get(fileHash).add(sqlInfoItem);
528                }
529            }
530        }
531
532        final TSQLEnv[] env = new TSQLEnv[]{sqlenv};
533        if (sqlenv == null) {
534                TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos);
535                        if (envs != null && envs.length > 0) {
536                                env[0] = envs[0];
537                        }
538        }
539        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
540                .newFixedThreadPool(option.getParallel() < sqlInfos.length
541                        ? option.getParallel()
542                        : sqlInfos.length);
543        
544        List<File> tempFiles = Collections.synchronizedList(new ArrayList<File>());
545        final Map<SqlInfo, dataflow> dataflowMap = useSaveMemoryMode ? null : new ConcurrentHashMap<>();
546        
547        try {
548            final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
549
550            logger.info("start parallel analyze " + sqlInfos.length + " sqlinfos");
551
552            for (int i = 0; i < sqlInfos.length; i++) {
553                final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length];
554                final SqlInfo item = sqlInfos[i];
555                sqlInfoCopy[i] = item;
556                final Option optionCopy = (Option) option.clone();
557                optionCopy.setStartId(5000000L * i);
558                optionCopy.setOutput(false);
559                final int index = i;
560                Runnable task = new Runnable() {
561                    @Override
562                    public void run() {
563                        try {
564                            logger.info("start analyze sqlinfo[" + index + "]" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : ""));
565                            DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy);
566                            analyzer.setSqlEnv(env[0]);
567                            analyzer.generateDataFlow(withExtraInfo);
568                            dataflow dataflow = analyzer.getDataFlow();
569                            logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()+", error count: "+ analyzer.getErrorMessages().size());
570                            if (analyzer.getErrorMessages().size() > 10000) {
571                                dataflow.setErrors(new ArrayList<>(dataflow.getErrors().subList(0, 10000)));
572                                errorInfos.addAll(analyzer.getErrorMessages().subList(0, 10000));
573                                logger.warn("Too many errors in dataflow ("+dataflow.getErrors().size()+"), truncating to first 10000 errors" + (sqlInfos[index].getFileName() != null ? ", file name = " + sqlInfos[index].getFileName() : ""));
574                            }
575                            else{
576                                errorInfos.addAll(analyzer.getErrorMessages());
577                            }
578                            
579                            if (useSaveMemoryMode) {
580                                File tempFile = File.createTempFile("dataflow_" + index + "_" + System.currentTimeMillis() + "_", ".xml.zip");
581                                XML2Model.saveXML(dataflow, tempFile);
582                                tempFiles.add(tempFile);
583                            } else {
584                                dataflowMap.put(item, dataflow);
585                            }
586                            
587                            hashSQLMap.putAll(analyzer.getHashSQLMap());
588                            analyzer.dispose();
589                        }
590                        catch (Exception e) {
591                            logger.error("analyze sqlinfo[" + index + "] failed.", e);
592                        }
593                        finally {
594                            latch.countDown();
595                        }
596                    }
597                };
598                executor.submit(task);
599            }
600            latch.await();
601        } catch (Exception e) {
602            logger.error("execute task failed.", e);
603        }
604        executor.shutdown();
605
606        ModelBindingManager modelManager = new ModelBindingManager();
607        modelManager.setGlobalVendor(option.getVendor());
608        modelManager.setGlobalOption(option);
609        ModelBindingManager.set(modelManager);
610        
611        if (useSaveMemoryMode) {
612            // 使用节约内存模式,迭代合并
613            logger.info("start merge dataflow, dataflow count: " + tempFiles.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
614            this.dataflow = iterativeMergeDataFlows(tempFiles, 5000000L * sqlInfos.length);
615            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
616                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
617            }
618            // 清理临时文件
619            for (File tempFile : tempFiles) {
620                tempFile.delete();
621            }
622        } else {
623            // 保持原有逻辑
624            logger.info("start merge dataflow, dataflow count: " + dataflowMap.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
625            this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length);
626            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
627                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
628            }
629            dataflowMap.clear();
630        }
631        
632        if (this.dataflow != null) {
633            logger.info("merge done, relation count: " + this.dataflow.getRelationships().size());
634        }
635        
636        if (dataflow != null && option.isOutput()) {
637            if (option.isTextFormat()) {
638                dataflowString = DataFlowAnalyzer.getTextOutput(dataflow);
639            } else {
640                try {
641                    dataflowString = XML2Model.saveXML(dataflow);
642                }catch (Exception e){
643                    logger.error("save dataflow as xml failed.", e);
644                    dataflowString = null;
645                }
646            }
647        }
648        ModelBindingManager.remove();
649        return dataflowString;
650    }
651    
652    private dataflow iterativeMergeDataFlows(List<File> tempFiles, long startId) {
653        if (tempFiles == null || tempFiles.isEmpty()) {
654            logger.warn("iterativeMergeDataFlows: tempFiles is null or empty");
655            return null;
656        }
657
658        try {
659            Collections.sort(tempFiles, (f1, f2) -> Long.compare(f1.length(), f2.length()));
660            logger.info("iterativeMergeDataFlows: sorted tempFiles, first: " + tempFiles.get(0).length() + ", last: " + tempFiles.get(tempFiles.size() - 1).length());
661        } catch (Exception e) {
662            logger.error("iterativeMergeDataFlows failed.", e);
663        }
664
665        long startTime = System.currentTimeMillis();
666        int totalIterations = tempFiles.size() - 1;
667        logger.info("iterativeMergeDataFlows: start merging, total dataflows: " + tempFiles.size() + ", total iterations: " + totalIterations);
668        
669        long loadStartTime = System.currentTimeMillis();
670        dataflow mergedDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(0));
671        long loadEndTime = System.currentTimeMillis();
672        int initialRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
673        logger.info("iterativeMergeDataFlows: loaded initial dataflow, relation count: " + initialRelationCount + ", time: " + (loadEndTime - loadStartTime) + "ms");
674        
675        for (int i = 1; i < tempFiles.size(); i++) {
676            long iterationStartTime = System.currentTimeMillis();
677            int currentIteration = i;
678            int remainingIterations = totalIterations - currentIteration + 1;
679            
680            logger.info("iterativeMergeDataFlows: iteration " + currentIteration + "/" + totalIterations + ", remaining: " + remainingIterations);
681            
682            long loadCurrentStartTime = System.currentTimeMillis();
683            dataflow currentDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(i));
684            long loadCurrentEndTime = System.currentTimeMillis();
685            int currentRelationCount = currentDataflow.getRelationships() != null ? currentDataflow.getRelationships().size() : 0;
686            logger.info("iterativeMergeDataFlows: loaded dataflow[" + i + "], relation count: " + currentRelationCount + ", time: " + (loadCurrentEndTime - loadCurrentStartTime) + "ms");
687            
688            List<dataflow> tempList = new ArrayList<>();
689            tempList.add(mergedDataflow);
690            tempList.add(currentDataflow);
691            
692            long mergeStartTime = System.currentTimeMillis();
693            mergedDataflow = mergeDataFlows(tempList, startId + 5000000L * i);
694            long mergeEndTime = System.currentTimeMillis();
695            int mergedRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
696            
697            long iterationEndTime = System.currentTimeMillis();
698            long iterationTime = iterationEndTime - iterationStartTime;
699            long mergeTime = mergeEndTime - mergeStartTime;
700            logger.info("iterativeMergeDataFlows: iteration " + currentIteration + " completed, merged relation count: " + mergedRelationCount + ", merge time: " + mergeTime + "ms, total iteration time: " + iterationTime + "ms");
701            
702            currentDataflow = null;
703            tempList.clear();
704        }
705        
706        long endTime = System.currentTimeMillis();
707        long totalTime = endTime - startTime;
708        int finalRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
709        logger.info("iterativeMergeDataFlows: all iterations completed, final relation count: " + finalRelationCount + ", total time: " + totalTime + "ms (" + (totalTime / 1000.0) + "s)");
710        
711        return mergedDataflow;
712    }
713
714    private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index,
715                               SqlInfo sqlInfo, Map queryObject) {
716        EDbVendor vendor = option.getVendor();
717        if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){
718            vendor = EDbVendor.valueOf(sqlInfo.getDbVendor());
719        }
720
721        boolean supportCatalog = TSQLEnv.supportCatalog(vendor);
722        boolean supportSchema = TSQLEnv.supportSchema(vendor);
723
724        String content = (String) queryObject.get("sourceCode");
725        if (SQLUtil.isEmpty(content)) {
726            return;
727        }
728
729        StringBuilder builder = new StringBuilder();
730        if (supportCatalog) {
731            String database = (String) queryObject.get("database");
732            if (database.indexOf(".") != -1) {
733                String delimitedChar = TSQLEnv.delimitedChar(vendor);
734                database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar;
735            }
736            builder.append(database);
737        }
738        if (supportSchema) {
739            String schema = (String) queryObject.get("schema");
740            if (schema.indexOf(".") != -1) {
741                String delimitedChar = TSQLEnv.delimitedChar(vendor);
742                schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar;
743            }
744            if (builder.length() > 0) {
745                builder.append(".");
746            }
747            builder.append(schema);
748        }
749        String group = builder.toString();
750        String sqlHash = SHA256.getMd5(content);
751        String hash = SHA256.getMd5(sqlHash);
752        if (!databaseMap.containsKey(sqlHash)) {
753            databaseMap.put(sqlHash,
754                    new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group));
755        }
756        TGSqlParser parser = new TGSqlParser(option.getVendor());
757        String delimiterChar = String.valueOf(parser.getDelimiterChar());
758        StringBuilder buffer = new StringBuilder(content);
759        if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) {
760            buffer.append("\n");
761        } else if(vendor == EDbVendor.dbvredshift
762                || vendor == EDbVendor.dbvgaussdb
763                || vendor == EDbVendor.dbvedb
764                || vendor == EDbVendor.dbvpostgresql
765                || vendor == EDbVendor.dbvmysql
766                || vendor == EDbVendor.dbvteradata){
767            buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n");
768        } else{
769            SQLUtil.endTrim(buffer);
770            buffer.append(";").append("\n");
771        }
772
773        int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1;
774        if (databaseMap.get(sqlHash).first.toString().length() == 0) {
775            lineStart = 0;
776        }
777        databaseMap.get(sqlHash).first.append(buffer.toString());
778        SqlInfo sqlInfoItem = new SqlInfo();
779        sqlInfoItem.setFileName(sqlInfo.getFileName());
780        sqlInfoItem.setFilePath(sqlInfo.getFilePath());
781        sqlInfoItem.setSql(buffer.toString());
782        sqlInfoItem.setOriginIndex(index);
783        sqlInfoItem.setOriginLineStart(0);
784        sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1);
785        sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement());
786        sqlInfoItem.setLineStart(lineStart);
787        sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1);
788        sqlInfoItem.setGroup(group);
789        sqlInfoItem.setHash(hash);
790
791        if (!sqlInfoMap.containsKey(hash)) {
792            sqlInfoMap.put(hash, new ArrayList<SqlInfo>());
793        }
794        sqlInfoMap.get(hash).add(sqlInfoItem);
795    }
796
797    @Override
798    public void dispose() {
799        ModelBindingManager.remove();
800    }
801
802    private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) {
803        return mergeDataFlows(dataflowMap.values(), startId);
804    }
805
806    public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) {
807        ModelBindingManager.setGlobalVendor(vendor);
808        try {
809            return mergeDataFlows(dataflows, 5000000L * dataflows.size());
810        } finally {
811            ModelBindingManager.removeGlobalVendor();
812        }
813    }
814
815    private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) {
816        dataflow mergeDataflow = new dataflow();
817
818        List<table> tableCopy = new ArrayList<table>();
819        List<table> viewCopy = new ArrayList<table>();
820        List<table> databaseCopy = new ArrayList<table>();
821        List<table> schemaCopy = new ArrayList<table>();
822        List<table> stageCopy = new ArrayList<table>();
823        List<table> dataSourceCopy = new ArrayList<table>();
824        List<table> streamCopy = new ArrayList<table>();
825        List<table> fileCopy = new ArrayList<table>();
826        List<table> variableCopy = new ArrayList<table>();
827        List<table> resultSetCopy = new ArrayList<table>();
828
829        List<process> processCopy = new ArrayList<process>();
830        List<relationship> relationshipCopy = new ArrayList<relationship>();
831        List<procedure> procedureCopy = new ArrayList<procedure>();
832        List<oraclePackage> packageCopy = new ArrayList<oraclePackage>();
833        List<error> errorCopy = new ArrayList<error>();
834
835        List<table> tables = new ArrayList<table>();
836        for (dataflow dataflow : dataflows) {
837            if (dataflow == null) {
838                continue;
839            }
840            
841            if (dataflow.getTables() != null) {
842                tableCopy.addAll(dataflow.getTables());
843            }
844            mergeDataflow.setTables(tableCopy);
845            if (dataflow.getViews() != null) {
846                viewCopy.addAll(dataflow.getViews());
847            }
848            mergeDataflow.setViews(viewCopy);
849            if (dataflow.getDatabases() != null) {
850                databaseCopy.addAll(dataflow.getDatabases());
851            }
852            mergeDataflow.setDatabases(databaseCopy);
853            if (dataflow.getSchemas() != null) {
854                schemaCopy.addAll(dataflow.getSchemas());
855            }
856            mergeDataflow.setSchemas(schemaCopy);
857            if (dataflow.getStages() != null) {
858                stageCopy.addAll(dataflow.getStages());
859            }
860            mergeDataflow.setStages(stageCopy);
861            if (dataflow.getDatasources() != null) {
862                dataSourceCopy.addAll(dataflow.getDatasources());
863            }
864            mergeDataflow.setDatasources(dataSourceCopy);
865            if (dataflow.getStreams() != null) {
866                streamCopy.addAll(dataflow.getStreams());
867            }
868            mergeDataflow.setStreams(streamCopy);
869            if (dataflow.getPaths() != null) {
870                fileCopy.addAll(dataflow.getPaths());
871            }
872            mergeDataflow.setPaths(fileCopy);
873            if (dataflow.getVariables() != null) {
874                variableCopy.addAll(dataflow.getVariables());
875            }
876            mergeDataflow.setVariables(variableCopy);
877            if (dataflow.getResultsets() != null) {
878                resultSetCopy.addAll(dataflow.getResultsets());
879            }
880            mergeDataflow.setResultsets(resultSetCopy);
881            if (dataflow.getProcesses() != null) {
882                processCopy.addAll(dataflow.getProcesses());
883            }
884            mergeDataflow.setProcesses(processCopy);
885            if (dataflow.getRelationships() != null) {
886                relationshipCopy.addAll(dataflow.getRelationships());
887            }
888            mergeDataflow.setRelationships(relationshipCopy);
889            if (dataflow.getProcedures() != null) {
890                procedureCopy.addAll(dataflow.getProcedures());
891            }
892            mergeDataflow.setProcedures(procedureCopy);
893            if (dataflow.getPackages() != null) {
894                packageCopy.addAll(dataflow.getPackages());
895            }
896            mergeDataflow.setPackages(packageCopy);
897            if (dataflow.getErrors() != null) {
898                errorCopy.addAll(dataflow.getErrors());
899            }
900            if (errorCopy.size() > 10000) {
901                errorCopy = errorCopy.subList(0, 10000);
902            }
903            mergeDataflow.setErrors(errorCopy);
904
905            tables.addAll(dataflow.getTables());
906            tables.addAll(dataflow.getViews());
907            tables.addAll(dataflow.getDatabases());
908            tables.addAll(dataflow.getSchemas());
909            tables.addAll(dataflow.getStages());
910            tables.addAll(dataflow.getDatasources());
911            tables.addAll(dataflow.getStreams());
912            tables.addAll(dataflow.getPaths());
913            tables.addAll(dataflow.getResultsets());
914            tables.addAll(dataflow.getVariables());
915        }
916
917        Map<String, List<table>> tableMap = new HashMap<String, List<table>>();
918        Map<String, String> tableTypeMap = new HashMap<String, String>();
919        Map<String, String> tableIdMap = new HashMap<String, String>();
920
921        Map<String, List<column>> columnMap = new HashMap<String, List<column>>();
922        Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>();
923        Map<String, String> columnIdMap = new HashMap<String, String>();
924        Map<String, column> columnMergeIdMap = new HashMap<String, column>();
925
926        List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures());
927        if(mergeDataflow.getPackages()!=null){
928            for(oraclePackage pkg : mergeDataflow.getPackages()){
929                procedures.addAll(pkg.getProcedures());
930            }
931        }
932
933        Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet());
934
935        for (table table : tables) {
936            String qualifiedTableName = DlineageUtil.getQualifiedTableName(table);
937            String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName);
938
939            if (!tableMap.containsKey(tableFullName)) {
940                tableMap.put(tableFullName, new ArrayList<table>());
941            }
942
943            tableMap.get(tableFullName).add(table);
944
945            if (!tableTypeMap.containsKey(tableFullName)) {
946                tableTypeMap.put(tableFullName, table.getType());
947            } else if ("view".equals(table.getSubType())) {
948                tableTypeMap.put(tableFullName, table.getType());
949            } else if ("database".equals(table.getSubType())) {
950                tableTypeMap.put(tableFullName, table.getType());
951            } else if ("schema".equals(table.getSubType())) {
952                tableTypeMap.put(tableFullName, table.getType());
953            } else if ("stage".equals(table.getSubType())) {
954                tableTypeMap.put(tableFullName, table.getType());
955            } else if ("datasource".equals(table.getSubType())) {
956                tableTypeMap.put(tableFullName, table.getType());
957            } else if ("stream".equals(table.getSubType())) {
958                tableTypeMap.put(tableFullName, table.getType());
959            } else if ("file".equals(table.getSubType())) {
960                tableTypeMap.put(tableFullName, table.getType());
961            } else if ("table".equals(tableTypeMap.get(tableFullName))) {
962                tableTypeMap.put(tableFullName, table.getType());
963            } else if ("variable".equals(tableTypeMap.get(tableFullName))) {
964                tableTypeMap.put(tableFullName, table.getType());
965            }
966
967            if (table.getColumns() != null) {
968                if (!tableColumnMap.containsKey(tableFullName)) {
969                    tableColumnMap.put(tableFullName, new LinkedHashSet<String>());
970                }
971                for (column column : table.getColumns()) {
972                    String columnFullName = tableFullName + "."
973                            + DlineageUtil.getIdentifierNormalColumnName(column.getName());
974
975                    if (!columnMap.containsKey(columnFullName)) {
976                        columnMap.put(columnFullName, new ArrayList<column>());
977                        tableColumnMap.get(tableFullName).add(columnFullName);
978                    }
979
980                    columnMap.get(columnFullName).add(column);
981                }
982            }
983        }
984
985        Iterator<String> tableNameIter = tableMap.keySet().iterator();
986        while (tableNameIter.hasNext()) {
987            String tableName = tableNameIter.next();
988            List<table> tableList = tableMap.get(tableName);
989            table table;
990            if (tableList.size() > 1) {
991                table standardTable = tableList.get(0);
992                //Function允许重名,不做合并处理
993                if (standardTable.isFunction()) {
994                    continue;
995                }
996
997                String type = tableTypeMap.get(tableName);
998                table = new table();
999                table.setId(String.valueOf(++startId));
1000                table.setServer(standardTable.getServer());
1001                table.setDatabase(standardTable.getDatabase());
1002                table.setSchema(standardTable.getSchema());
1003                table.setName(standardTable.getName());
1004                table.setDisplayName(standardTable.getDisplayName());
1005                table.setParent(standardTable.getParent());
1006                table.setColumns(new ArrayList<column>());
1007                Set<String> processIds = new LinkedHashSet<String>();
1008                for (int k = 0; k < tableList.size(); k++) {
1009                    if (tableList.get(k).getProcessIds() != null) {
1010                        processIds.addAll(tableList.get(k).getProcessIds());
1011                    }
1012                }
1013                if (!processIds.isEmpty()) {
1014                    table.setProcessIds(new ArrayList<String>(processIds));
1015                }
1016                table.setType(type);
1017                for (table item : tableList) {
1018                    if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) {
1019                        if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) {
1020                            table.appendCoordinate(item.getCoordinate());
1021                        }
1022                    } else if (!SQLUtil.isEmpty(item.getCoordinate())) {
1023                        table.setCoordinate(item.getCoordinate());
1024                    }
1025
1026                    if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) {
1027                        table.setAlias(table.getAlias() + "," + item.getAlias());
1028                    } else if (!SQLUtil.isEmpty(item.getAlias())) {
1029                        table.setAlias(item.getAlias());
1030                    }
1031
1032                    tableIdMap.put(item.getId(), table.getId());
1033
1034                    if (item.isView()) {
1035                        mergeDataflow.getViews().remove(item);
1036                    } else if (item.isDatabaseType()) {
1037                        mergeDataflow.getDatabases().remove(item);
1038                    } else if (item.isSchemaType()) {
1039                        mergeDataflow.getSchemas().remove(item);
1040                    } else if (item.isStage()) {
1041                        mergeDataflow.getStages().remove(item);
1042                    } else if (item.isDataSource()) {
1043                        mergeDataflow.getDatasources().remove(item);
1044                    } else if (item.isStream()) {
1045                        mergeDataflow.getStreams().remove(item);
1046                    } else if (item.isFile()) {
1047                        mergeDataflow.getPaths().remove(item);
1048                    } else if (item.isVariable()) {
1049                        mergeDataflow.getVariables().remove(item);
1050                    } else if (item.isTable()) {
1051                        mergeDataflow.getTables().remove(item);
1052                    } else if (item.isResultSet()) {
1053                        mergeDataflow.getResultsets().remove(item);
1054                    }
1055                }
1056
1057                if (table.isView()) {
1058                    mergeDataflow.getViews().add(table);
1059                } else if (table.isDatabaseType()) {
1060                    mergeDataflow.getDatabases().add(table);
1061                } else if (table.isSchemaType()) {
1062                    mergeDataflow.getSchemas().add(table);
1063                } else if (table.isStage()) {
1064                    mergeDataflow.getStages().add(table);
1065                } else if (table.isDataSource()) {
1066                    mergeDataflow.getDatasources().add(table);
1067                } else if (table.isStream()) {
1068                    mergeDataflow.getStreams().add(table);
1069                } else if (table.isFile()) {
1070                    mergeDataflow.getPaths().add(table);
1071                } else if (table.isVariable()) {
1072                    mergeDataflow.getVariables().add(table);
1073                } else if (table.isResultSet()) {
1074                    mergeDataflow.getResultsets().add(table);
1075                } else {
1076                    mergeDataflow.getTables().add(table);
1077                }
1078            } else {
1079                table = tableList.get(0);
1080            }
1081
1082            Set<String> columns = tableColumnMap.get(tableName);
1083            Iterator<String> columnIter = columns.iterator();
1084            List<column> mergeColumns = new ArrayList<column>();
1085            while (columnIter.hasNext()) {
1086                String columnName = columnIter.next();
1087                List<column> columnList = columnMap.get(columnName);
1088                List<column> functions = new ArrayList<column>();
1089                for (column t : columnList) {
1090                    if (Boolean.TRUE.toString().equals(t.getIsFunction())) {
1091                        functions.add(t);
1092                    }
1093                }
1094                if (functions != null && !functions.isEmpty()) {
1095                    for (column function : functions) {
1096                        mergeColumns.add(function);
1097                        columnIdMap.put(function.getId(), function.getId());
1098                        columnMergeIdMap.put(function.getId(), function);
1099                    }
1100
1101                    columnList.removeAll(functions);
1102                }
1103                if (!columnList.isEmpty()) {
1104                    column firstColumn = columnList.iterator().next();
1105                    if (columnList.size() > 1) {
1106                        column mergeColumn = new column();
1107                        mergeColumn.setId(String.valueOf(++startId));
1108                        mergeColumn.setName(firstColumn.getName());
1109                        mergeColumn.setDisplayName(firstColumn.getDisplayName());
1110                        mergeColumn.setSource(firstColumn.getSource());
1111                        mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable());
1112                        mergeColumns.add(mergeColumn);
1113                        for (column item : columnList) {
1114                            mergeColumn.appendCoordinate(item.getCoordinate());
1115                            columnIdMap.put(item.getId(), mergeColumn.getId());
1116                            //add by grq 2023.02.06 issue=I6DB5S
1117                            if(item.getDataType() != null){
1118                                mergeColumn.setDataType(item.getDataType());
1119                            }
1120                            if(item.isForeignKey() != null){
1121                                mergeColumn.setForeignKey(item.isForeignKey());
1122                            }
1123                            if(item.isUnqiueKey() != null){
1124                                mergeColumn.setUnqiueKey(item.isUnqiueKey());
1125                            }
1126                            if(item.isIndexKey() != null){
1127                                mergeColumn.setIndexKey(item.isIndexKey());
1128                            }
1129                            if(item.isPrimaryKey() != null){
1130                                mergeColumn.setPrimaryKey(item.isPrimaryKey());
1131                            }
1132                            //end by grq
1133                        }
1134                        columnMergeIdMap.put(mergeColumn.getId(), mergeColumn);
1135                    } else {
1136                        mergeColumns.add(firstColumn);
1137                        columnIdMap.put(firstColumn.getId(), firstColumn.getId());
1138                        columnMergeIdMap.put(firstColumn.getId(), firstColumn);
1139                    }
1140                }
1141            }
1142            table.setColumns(mergeColumns);
1143        }
1144
1145        if (mergeDataflow.getRelationships() != null) {
1146            Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>();
1147            for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) {
1148                relationship relation = mergeDataflow.getRelationships().get(i);
1149                if(RelationshipType.call.name().equals(relation.getType())){
1150                    targetColumn target = relation.getCaller();
1151                    if (target == null) {
1152                        continue;
1153                    }
1154                    if (target != null && tableIdMap.containsKey(target.getId())) {
1155                        target.setId(tableIdMap.get(target.getId()));
1156                    }
1157
1158                    List<sourceColumn> sources = relation.getCallees();
1159                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1160                    if (sources != null) {
1161                        for (sourceColumn source : sources) {
1162                            if (tableIdMap.containsKey(source.getId())) {
1163                                source.setId(tableIdMap.get(source.getId()));
1164                            }
1165                        }
1166                        sourceSet.addAll(sources);
1167                        relation.setCallees(new ArrayList<sourceColumn>(sourceSet));
1168                    }
1169
1170                    String jsonString = JSON.toJSONString(relation, true);
1171                    String key = SHA256.getMd5(jsonString);
1172                    if (!mergeRelations.containsKey(key)) {
1173                        mergeRelations.put(key, relation);
1174                    }
1175                }
1176                else {
1177                    targetColumn target = relation.getTarget();
1178                    if (target == null) {
1179                        continue;
1180                    }
1181                    if (target != null && tableIdMap.containsKey(target.getParent_id())) {
1182                        target.setParent_id(tableIdMap.get(target.getParent_id()));
1183                    }
1184
1185                    if (columnIdMap.containsKey(target.getId())) {
1186                        target.setId(columnIdMap.get(target.getId()));
1187                        target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate());
1188                    }
1189
1190                    List<sourceColumn> sources = relation.getSources();
1191                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1192                    if (sources != null) {
1193                        for (sourceColumn source : sources) {
1194                            if (tableIdMap.containsKey(source.getParent_id())) {
1195                                source.setParent_id(tableIdMap.get(source.getParent_id()));
1196                            }
1197                            if (tableIdMap.containsKey(source.getSource_id())) {
1198                                source.setSource_id(tableIdMap.get(source.getSource_id()));
1199                            }
1200                            if (columnIdMap.containsKey(source.getId())) {
1201                                source.setId(columnIdMap.get(source.getId()));
1202                                source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate());
1203                            }
1204                        }
1205
1206                        sourceSet.addAll(sources);
1207                        relation.setSources(new ArrayList<sourceColumn>(sourceSet));
1208                    }
1209
1210                    String jsonString = JSON.toJSONString(relation, true);
1211                    String key = SHA256.getMd5(jsonString);
1212                    if (!mergeRelations.containsKey(key)) {
1213                        mergeRelations.put(key, relation);
1214                    }
1215                }
1216            }
1217
1218            mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values()));
1219        }
1220
1221        tableMap.clear();
1222        tableTypeMap.clear();
1223        tableIdMap.clear();
1224        columnMap.clear();
1225        tableColumnMap.clear();
1226        columnIdMap.clear();
1227        columnMergeIdMap.clear();
1228        tables.clear();
1229
1230        return mergeDataflow;
1231    }
1232
1233    @Override
1234    public synchronized dataflow getDataFlow() {
1235        if (dataflow != null) {
1236            return dataflow;
1237        } else if (dataflowString != null) {
1238            return XML2Model.loadXML(dataflow.class, dataflowString);
1239        }
1240        return null;
1241    }
1242
1243    @Override
1244    public Map<String, String> getHashSQLMap() {
1245        return hashSQLMap;
1246    }
1247
1248    public static void main(String[] args) throws Exception {
1249        Option option = new Option();
1250        option.setVendor(EDbVendor.dbvmssql);
1251        option.setOutput(false);
1252//        option.setSimpleOutput(true);
1253        File parentDir = new File("C:\\Users\\KK\\Desktop\\sql");
1254        ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\metadata_with_query\\客户原始sql.json")}, option, 5, new File("C:\\Users\\KK\\Desktop\\metadata_with_query"));
1255        analyzer.generateDataFlow(false, true);
1256        dataflow dataflow = analyzer.getDataFlow();
1257//        System.out.println(XML2Model.saveXML(dataflow));
1258//        dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip"));
1259        XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip"));
1260//        dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow);
1261//        dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle);
1262//        System.out.println(XML2Model.saveXML(dataflow));
1263    }
1264}