001package gudusoft.gsqlparser.dlineage;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TGSqlParser;
006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener;
007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader;
008import gudusoft.gsqlparser.dlineage.dataflow.model.*;
009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate;
010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*;
011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser;
012import gudusoft.gsqlparser.dlineage.util.*;
013import gudusoft.gsqlparser.sqlenv.TSQLEnv;
014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
015import gudusoft.gsqlparser.util.IndexedLinkedHashMap;
016import gudusoft.gsqlparser.util.Logger;
017import gudusoft.gsqlparser.util.LoggerFactory;
018import gudusoft.gsqlparser.util.SQLUtil;
019import gudusoft.gsqlparser.util.json.JSON;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.*;
024import java.util.concurrent.*;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.stream.Collectors;
027
028public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer {
029    private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class);
030    private SqlInfo[] sqlInfos;
031    private Option option = new Option();
032    private TSQLEnv sqlenv = null;
033    private dataflow dataflow;
034    private String dataflowString;
035    private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>();
036    private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>();
037    private final Map<String, String> hashSQLMap = new HashMap();
038
039    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) {
040        this.sqlInfos = sqlInfos;
041        option.setVendor(dbVendor);
042        option.setSimpleOutput(simpleOutput);
043        ModelBindingManager.setGlobalOption(option);
044    }
045
046    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) {
047        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
048        for (int i = 0; i < sqlContents.length; i++) {
049            SqlInfo info = new SqlInfo();
050            info.setSql(sqlContents[i]);
051            info.setOriginIndex(0);
052            sqlInfos[i] = info;
053        }
054        option.setVendor(dbVendor);
055        option.setSimpleOutput(simpleOutput);
056                option.setDefaultServer(defaultServer);
057                option.setDefaultDatabase(defaultDatabase);
058                option.setDefaultSchema(defaltSchema);
059        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
060    }
061    
062    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) {
063        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
064        for (int i = 0; i < sqlContents.length; i++) {
065            SqlInfo info = new SqlInfo();
066            info.setSql(sqlContents[i]);
067            info.setOriginIndex(0);
068            sqlInfos[i] = info;
069        }
070        option.setVendor(dbVendor);
071        option.setSimpleOutput(simpleOutput);
072        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
073    }
074
075    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) {
076        this.sqlInfos = sqlInfos;
077        this.option = option;
078        ModelBindingManager.setGlobalOption(option);
079    }
080
081    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) {
082        SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length];
083        for (int i = 0; i < sqlFiles.length; i++) {
084            SqlInfo info = new SqlInfo();
085            info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
086            info.setFileName(sqlFiles[i].getName());
087            info.setFilePath(sqlFiles[i].getAbsolutePath());
088            info.setOriginIndex(0);
089            sqlInfos[i] = info;
090        }
091        this.sqlInfos = sqlInfos;
092        this.option = option;
093        ModelBindingManager.setGlobalOption(option);
094    }
095    
096    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, File splitDir) {
097        List<SqlInfo> sqlInfoList = new ArrayList<SqlInfo>();
098        for (int i = 0; i < sqlFiles.length; i++) {
099            try {
100                // Ensure split directory exists
101                if (!splitDir.exists() && !splitDir.mkdirs()) {
102                    throw new IOException("Failed to create split directory: " + splitDir.getAbsolutePath());
103                }
104                
105                // Split the file if it's large
106                List<File> splitFiles = gudusoft.gsqlparser.util.FileSplitter.splitFile(sqlFiles[i], splitDir, splitSizeMB, option.getVendor());
107                
108                if (splitFiles.isEmpty()) {
109                    // If no split files were created, process the original file
110                    SqlInfo info = new SqlInfo();
111                    info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
112                    info.setFileName(sqlFiles[i].getName());
113                    info.setFilePath(sqlFiles[i].getAbsolutePath());
114                    info.setOriginIndex(0);
115                    sqlInfoList.add(info);
116                } else {
117                    // Process each split file
118                    for (File splitFile : splitFiles) {
119                        SqlInfo info = new SqlInfo();
120                        info.setSql(SQLUtil.getFileContent(splitFile));
121                        info.setFileName(splitFile.getName());
122                        info.setFilePath(splitFile.getAbsolutePath());
123                        info.setOriginIndex(0);
124                        sqlInfoList.add(info);
125                    }
126                }
127            } catch (Exception e) {
128                sqlInfoList.clear();
129                logger.error("Error splitting file: " + sqlFiles[i].getAbsolutePath(), e);
130                // Fallback to original file if splitting fails
131                SqlInfo info = new SqlInfo();
132                info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
133                info.setFileName(sqlFiles[i].getName());
134                info.setFilePath(sqlFiles[i].getAbsolutePath());
135                info.setOriginIndex(0);
136                sqlInfoList.add(info);
137            }
138        }
139        this.sqlInfos = sqlInfoList.toArray(new SqlInfo[0]);
140        this.option = option;
141        ModelBindingManager.setGlobalOption(option);
142    }
143    
144    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, String splitDirPath) {
145        this(sqlFiles, option, splitSizeMB, new File(splitDirPath));
146    }
147    
148    protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) {
149                List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>();
150                List sqlContents = (List) JSON.parseObject(json);
151                for (int j = 0; j < sqlContents.size(); j++) {
152                        Map sqlContent = (Map) sqlContents.get(j);
153                        String sql = (String) sqlContent.get("sql");
154                        String fileName = (String) sqlContent.get("fileName");
155                        String filePath = (String) sqlContent.get("filePath");
156                        if (sql != null && sql.trim().startsWith("{")) {
157                                if (sql.indexOf("createdBy") != -1) {
158                                        if (this.sqlenv == null) {
159                                                TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql);
160                                                if (sqlenvs != null) {
161                                                        this.sqlenv = sqlenvs[0];
162                                                }
163                                        }
164                        if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) {
165                                Map queryObject = (Map) JSON.parseObject(sql);
166                                        List querys = (List) queryObject.get("queries");
167                                        if (querys != null) {
168                                                for (int i = 0; i < querys.size(); i++) {
169                                                        Map object = (Map) querys.get(i);
170                                                        SqlInfo info = new SqlInfo();
171                                                        info.setSql(JSON.toJSONString(object));
172                                                        info.setFileName(fileName);
173                                                        info.setFilePath(filePath);
174                                                        info.setOriginIndex(i);
175                                                        sqlInfos.add(info);
176                                                }
177                                                queryObject.remove("queries");
178                                                SqlInfo info = new SqlInfo();
179                                                info.setSql(JSON.toJSONString(queryObject));
180                                                info.setFileName(fileName);
181                                                info.setFilePath(filePath);
182                                                info.setOriginIndex(querys.size());
183                                                sqlInfos.add(info);
184                                        } else {
185                                                SqlInfo info = new SqlInfo();
186                                                info.setSql(JSON.toJSONString(queryObject));
187                                                info.setFileName(fileName);
188                                                info.setFilePath(filePath);
189                                                info.setOriginIndex(0);
190                                                sqlInfos.add(info);
191                                        }
192                        }
193                        else if (sql.toLowerCase().indexOf("sqlflow") != -1) {
194                                                Map sqlflow = (Map) JSON.parseObject(sql);
195                                                List<Map> servers = (List<Map>) sqlflow.get("servers");
196                                                if (servers != null) {
197                                                        for (Map queryObject : servers) {
198                                                                String name = (String) queryObject.get("name");
199                                                                String dbVendor = (String) queryObject.get("dbVendor");
200                                                                List querys = (List) queryObject.get("queries");
201                                                                if (querys != null) {
202                                                                        for (int i = 0; i < querys.size(); i++) {
203                                                                                Map object = (Map) querys.get(i);
204                                                                                SqlInfo info = new SqlInfo();
205                                                                                info.setSql(JSON.toJSONString(object));
206                                                                                info.setFileName(fileName);
207                                                                                info.setFilePath(filePath);
208                                                                                info.setOriginIndex(i);
209                                                                                info.setDbVendor(dbVendor);
210                                                                                info.setServer(name);
211                                                                                sqlInfos.add(info);
212                                                                        }
213                                                                        queryObject.remove("queries");
214                                                                        SqlInfo info = new SqlInfo();
215                                                                        info.setSql(JSON.toJSONString(queryObject));
216                                                                        info.setFileName(fileName);
217                                                                        info.setFilePath(filePath);
218                                                                        info.setOriginIndex(querys.size());
219                                                                        info.setDbVendor(dbVendor);
220                                                                        info.setServer(filePath);
221                                                                        sqlInfos.add(info);
222                                                                } else {
223                                                                        SqlInfo info = new SqlInfo();
224                                                                        info.setSql(JSON.toJSONString(queryObject));
225                                                                        info.setFileName(fileName);
226                                                                        info.setFilePath(filePath);
227                                                                        info.setOriginIndex(0);
228                                                                        sqlInfos.add(info);
229                                                                }
230                                                        }
231                                                }
232                                        }
233                    }
234                        } else if (sql != null) {
235                                SqlInfo info = new SqlInfo();
236                                info.setSql(sql);
237                                info.setFileName(fileName);
238                                info.setFilePath(filePath);
239                                info.setOriginIndex(0);
240                                sqlInfos.add(info);
241                        }
242                }
243                return sqlInfos;
244        }
245
246    @Override
247    public boolean isIgnoreRecordSet() {
248        return option.isIgnoreRecordSet();
249    }
250
251    @Override
252    public void setIgnoreRecordSet(boolean ignoreRecordSet) {
253        option.setIgnoreRecordSet(ignoreRecordSet);
254    }
255
256    @Override
257    public boolean isSimpleShowTopSelectResultSet() {
258        return option.isSimpleShowTopSelectResultSet();
259    }
260
261    @Override
262    public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) {
263        option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet);
264    }
265
266    @Override
267    public boolean isSimpleShowFunction() {
268        return option.isSimpleShowFunction();
269    }
270
271    @Override
272    public void setSimpleShowFunction(boolean simpleShowFunction) {
273        option.setSimpleShowFunction(simpleShowFunction);
274    }
275
276    @Override
277    public boolean isShowJoin() {
278        return option.isShowJoin();
279    }
280
281    @Override
282    public void setShowJoin(boolean showJoin) {
283        option.setShowJoin(showJoin);
284    }
285
286    @Override
287    public boolean isShowImplicitSchema() {
288        return option.isShowImplicitSchema();
289    }
290
291    @Override
292    public void setShowImplicitSchema(boolean showImplicitSchema) {
293        option.setShowImplicitSchema(showImplicitSchema);
294    }
295
296    @Override
297    public boolean isShowConstantTable() {
298        return option.isShowConstantTable();
299    }
300
301    @Override
302    public void setShowConstantTable(boolean showConstantTable) {
303        option.setShowConstantTable(showConstantTable);
304    }
305
306    @Override
307    public boolean isShowCountTableColumn() {
308        return option.isShowCountTableColumn();
309    }
310
311    @Override
312    public void setShowCountTableColumn(boolean showCountTableColumn) {
313        option.setShowCountTableColumn(showCountTableColumn);
314    }
315
316    @Override
317    public boolean isTransform() {
318        return option.isTransform();
319    }
320
321    @Override
322    public void setTransform(boolean transform) {
323        option.setTransform(transform);
324        if (option.isTransformCoordinate()) {
325            option.setTransform(true);
326        }
327    }
328
329    @Override
330    public boolean isTransformCoordinate() {
331        return option.isTransformCoordinate();
332    }
333
334    @Override
335    public void setTransformCoordinate(boolean transformCoordinate) {
336        option.setTransformCoordinate(transformCoordinate);
337        if (transformCoordinate) {
338            option.setTransform(true);
339        }
340    }
341
342    @Override
343    public boolean isLinkOrphanColumnToFirstTable() {
344        return option.isLinkOrphanColumnToFirstTable();
345    }
346
347    @Override
348    public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) {
349        option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable);
350    }
351
352    @Override
353    public boolean isIgnoreCoordinate() {
354        return option.isIgnoreCoordinate();
355    }
356
357    @Override
358    public void setIgnoreCoordinate(boolean ignoreCoordinate) {
359        option.setIgnoreCoordinate(ignoreCoordinate);
360    }
361
362    @Override
363    public void setHandleListener(DataFlowHandleListener listener) {
364        option.setHandleListener(listener);
365    }
366
367    @Override
368    public void setSqlEnv(TSQLEnv sqlenv) {
369        this.sqlenv = sqlenv;
370    }
371
372    @Override
373    public void setOption(Option option) {
374        this.option = option;
375    }
376
377    @Override
378    public Option getOption() {
379        return option;
380    }
381
382    @Override
383    public List<ErrorInfo> getErrorMessages() {
384        return errorInfos;
385    }
386
387    @Override
388    public synchronized String generateSqlInfos() {
389        return JSON.toJSONString(sqlInfoMap);
390    }
391
392    @Override
393    public synchronized String generateDataFlow() {
394        return generateDataFlow(false);
395    }
396
397    @Override
398    public Map<String, List<SqlInfo>> getSqlInfos() {
399        return sqlInfoMap;
400    }
401
402    @Override
403    /**
404     * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo
405     */
406    public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) {
407        if (start == null || end == null) {
408            throw new IllegalArgumentException("Coordinate can't be null.");
409        }
410
411        String hashCode = start.getHashCode();
412
413        if (hashCode == null) {
414            throw new IllegalArgumentException("Coordinate hashcode can't be null.");
415        }
416
417        int dbObjectStartLine = (int) start.getX() - 1;
418        int dbObjectStarColumn = (int) start.getY() - 1;
419        int dbObjectEndLine = (int) end.getX() - 1;
420        int dbObjectEndColumn = (int) end.getY() - 1;
421        List<SqlInfo> sqlInfoList;
422        if (hashCode.matches("\\d+")) {
423            sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode));
424        } else {
425            sqlInfoList = sqlInfoMap.get(hashCode);
426        }
427        for (int j = 0; j < sqlInfoList.size(); j++) {
428            SqlInfo sqlInfo = sqlInfoList.get(j);
429            int startLine = sqlInfo.getLineStart();
430            int endLine = sqlInfo.getLineEnd();
431            if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) {
432                DbObjectPosition position = new DbObjectPosition();
433                position.setFile(sqlInfo.getFileName());
434                position.setFilePath(sqlInfo.getFilePath());
435                position.setSql(sqlInfo.getSql());
436                position.setIndex(sqlInfo.getOriginIndex());
437                List<Pair<Integer, Integer>> positions = position.getPositions();
438                positions.add(new Pair<Integer, Integer>(
439                        dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1));
440                positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1,
441                        dbObjectEndColumn + 1));
442                return position;
443            }
444        }
445        return null;
446    }
447
448    @Override
449    public synchronized String generateDataFlow(final boolean withExtraInfo) {
450        return generateDataFlow(withExtraInfo, true);
451    }
452    
453    public synchronized String generateDataFlow(final boolean withExtraInfo, boolean useSaveMemoryMode) {
454        sqlInfoMap.clear();
455        errorInfos.clear();
456        Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>();
457        for (int i = 0; i < sqlInfos.length; i++) {
458            SqlInfo sqlInfo = sqlInfos[i];
459                        if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) {
460                                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath()));
461                        }
462            if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) {
463                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName()));
464            }
465            if (sqlInfo == null || sqlInfo.getSql() == null) {
466                sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>());
467                continue;
468            }
469            String sql = sqlInfo.getSql();
470            if (sql != null && sql.trim().startsWith("{")) {
471                if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) {
472                    String hash = SHA256.getMd5(sql);
473                    String fileHash = SHA256.getMd5(hash);
474                    if (!sqlInfoMap.containsKey(fileHash)) {
475                        sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
476                        sqlInfoMap.get(fileHash).add(sqlInfo);
477                    }
478                } else {
479                    Map queryObject = (Map) JSON.parseObject(sql);
480                    appendSqlInfo(databaseMap, i, sqlInfo, queryObject);
481                }
482            } else {
483                String content = sql;
484                String hash = SHA256.getMd5(content);
485                String fileHash = SHA256.getMd5(hash);
486                if (!sqlInfoMap.containsKey(fileHash)) {
487                    sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
488
489                    String database = TSQLEnv.DEFAULT_DB_NAME;
490                    String schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
491                    if(sqlenv!=null) {
492                        database = sqlenv.getDefaultCatalogName();
493                        if(database == null) {
494                            database = TSQLEnv.DEFAULT_DB_NAME;
495                        }
496                        schema = sqlenv.getDefaultSchemaName();
497                        if(schema == null) {
498                            schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
499                        }
500                    }
501                    boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor());
502                    boolean supportSchema = TSQLEnv.supportSchema(option.getVendor());
503                    StringBuilder builder = new StringBuilder();
504                    if (supportCatalog) {
505                        builder.append(database);
506                    }
507                    if (supportSchema) {
508                        if (builder.length() > 0) {
509                            builder.append(".");
510                        }
511                        builder.append(schema);
512                    }
513                    String group = builder.toString();
514                    SqlInfo sqlInfoItem = new SqlInfo();
515                    sqlInfoItem.setFileName(sqlInfo.getFileName());
516                    sqlInfoItem.setFilePath(sqlInfo.getFilePath());
517                    sqlInfoItem.setSql(sqlInfo.getSql());
518                    sqlInfoItem.setOriginIndex(0);
519                    sqlInfoItem.setOriginLineStart(0);
520                    sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1);
521                    sqlInfoItem.setIndex(0);
522                    sqlInfoItem.setLineStart(0);
523                    sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1);
524                    sqlInfoItem.setHash(fileHash);
525                    sqlInfoItem.setGroup(group);
526
527                    sqlInfoMap.get(fileHash).add(sqlInfoItem);
528                }
529            }
530        }
531
532        final TSQLEnv[] env = new TSQLEnv[]{sqlenv};
533        if (sqlenv == null) {
534                TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos);
535                        if (envs != null && envs.length > 0) {
536                                env[0] = envs[0];
537                        }
538        }
539        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
540                .newFixedThreadPool(option.getParallel() < sqlInfos.length
541                        ? option.getParallel()
542                        : sqlInfos.length);
543        
544        List<File> tempFiles = Collections.synchronizedList(new ArrayList<File>());
545        final Map<SqlInfo, dataflow> dataflowMap = useSaveMemoryMode ? null : new ConcurrentHashMap<>();
546        
547        try {
548            final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
549
550            logger.info("start parallel analyze " + sqlInfos.length + " sqlinfos");
551
552            for (int i = 0; i < sqlInfos.length; i++) {
553                final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length];
554                final SqlInfo item = sqlInfos[i];
555                sqlInfoCopy[i] = item;
556                final Option optionCopy = (Option) option.clone();
557                optionCopy.setStartId(5000000L * i);
558                optionCopy.setOutput(false);
559                final int index = i;
560                Runnable task = new Runnable() {
561                    @Override
562                    public void run() {
563                        try {
564                            logger.info("start analyze sqlinfo[" + index + "]" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : ""));
565                            DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy);
566                            analyzer.setSqlEnv(env[0]);
567                            analyzer.generateDataFlow(withExtraInfo);
568                            dataflow dataflow = analyzer.getDataFlow();
569                            logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()+", error count: "+ analyzer.getErrorMessages().size());
570                            if (analyzer.getErrorMessages().size() > 10000) {
571                                dataflow.setErrors(new ArrayList<>(dataflow.getErrors().subList(0, 10000)));
572                                errorInfos.addAll(analyzer.getErrorMessages().subList(0, 10000));
573                                logger.warn("Too many errors in dataflow ("+dataflow.getErrors().size()+"), truncating to first 10000 errors" + (sqlInfos[index].getFileName() != null ? ", file name = " + sqlInfos[index].getFileName() : ""));
574                            }
575                            else{
576                                errorInfos.addAll(analyzer.getErrorMessages());
577                            }
578                            
579                            if (useSaveMemoryMode) {
580                                File tempFile = File.createTempFile("dataflow_" + index + "_" + System.currentTimeMillis() + "_", ".xml.zip");
581                                XML2Model.saveXML(dataflow, tempFile);
582                                tempFiles.add(tempFile);
583                            } else {
584                                dataflowMap.put(item, dataflow);
585                            }
586                            
587                            hashSQLMap.putAll(analyzer.getHashSQLMap());
588                            analyzer.dispose();
589                        }
590                        catch (Exception e) {
591                            logger.error("analyze sqlinfo[" + index + "] failed.", e);
592                        }
593                        finally {
594                            latch.countDown();
595                        }
596                    }
597                };
598                executor.submit(task);
599            }
600            latch.await();
601        } catch (Exception e) {
602            logger.error("execute task failed.", e);
603        }
604        executor.shutdown();
605
606        ModelBindingManager modelManager = new ModelBindingManager();
607        modelManager.setGlobalVendor(option.getVendor());
608        modelManager.setGlobalOption(option);
609        ModelBindingManager.set(modelManager);
610        
611        if (useSaveMemoryMode) {
612            // 使用节约内存模式,迭代合并
613            logger.info("start merge dataflow, dataflow count: " + tempFiles.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
614            this.dataflow = iterativeMergeDataFlows(tempFiles, 5000000L * sqlInfos.length);
615            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
616                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
617            }
618            // 清理临时文件
619            for (File tempFile : tempFiles) {
620                tempFile.delete();
621            }
622        } else {
623            // 保持原有逻辑
624            logger.info("start merge dataflow, dataflow count: " + dataflowMap.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
625            this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length);
626            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
627                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
628            }
629            dataflowMap.clear();
630        }
631        
632        if (this.dataflow != null) {
633            logger.info("merge done, relation count: " + this.dataflow.getRelationships().size());
634        }
635        
636        if (dataflow != null && option.isOutput()) {
637            if (option.isTextFormat()) {
638                dataflowString = DataFlowAnalyzer.getTextOutput(dataflow);
639            } else {
640                try {
641                    dataflowString = XML2Model.saveXML(dataflow);
642                }catch (Exception e){
643                    logger.error("save dataflow as xml failed.", e);
644                    dataflowString = null;
645                }
646            }
647        }
648        ModelBindingManager.remove();
649        return dataflowString;
650    }
651    
652    private dataflow iterativeMergeDataFlows(List<File> tempFiles, long startId) {
653        if (tempFiles == null || tempFiles.isEmpty()) {
654            logger.warn("iterativeMergeDataFlows: tempFiles is null or empty");
655            return null;
656        }
657
658        try {
659            Collections.sort(tempFiles, (f1, f2) -> Long.compare(f1.length(), f2.length()));
660            logger.info("iterativeMergeDataFlows: sorted tempFiles, first: " + tempFiles.get(0).length() + ", last: " + tempFiles.get(tempFiles.size() - 1).length());
661        } catch (Exception e) {
662            logger.error("iterativeMergeDataFlows failed.", e);
663        }
664
665        long startTime = System.currentTimeMillis();
666        int totalIterations = tempFiles.size() - 1;
667        logger.info("iterativeMergeDataFlows: start merging, total dataflows: " + tempFiles.size() + ", total iterations: " + totalIterations);
668        
669        long loadStartTime = System.currentTimeMillis();
670        dataflow mergedDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(0));
671        long loadEndTime = System.currentTimeMillis();
672        int initialRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
673        logger.info("iterativeMergeDataFlows: loaded initial dataflow, relation count: " + initialRelationCount + ", time: " + (loadEndTime - loadStartTime) + "ms");
674        
675        for (int i = 1; i < tempFiles.size(); i++) {
676            long iterationStartTime = System.currentTimeMillis();
677            int currentIteration = i;
678            int remainingIterations = totalIterations - currentIteration + 1;
679            
680            logger.info("iterativeMergeDataFlows: iteration " + currentIteration + "/" + totalIterations + ", remaining: " + remainingIterations);
681            
682            long loadCurrentStartTime = System.currentTimeMillis();
683            dataflow currentDataflow = XML2Model.loadXML(dataflow.class, tempFiles.get(i));
684            long loadCurrentEndTime = System.currentTimeMillis();
685            int currentRelationCount = currentDataflow.getRelationships() != null ? currentDataflow.getRelationships().size() : 0;
686            logger.info("iterativeMergeDataFlows: loaded dataflow[" + i + "], relation count: " + currentRelationCount + ", time: " + (loadCurrentEndTime - loadCurrentStartTime) + "ms");
687            
688            List<dataflow> tempList = new ArrayList<>();
689            tempList.add(mergedDataflow);
690            tempList.add(currentDataflow);
691            
692            long mergeStartTime = System.currentTimeMillis();
693            mergedDataflow = mergeDataFlows(tempList, startId + 5000000L * i);
694            long mergeEndTime = System.currentTimeMillis();
695            int mergedRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
696            
697            long iterationEndTime = System.currentTimeMillis();
698            long iterationTime = iterationEndTime - iterationStartTime;
699            long mergeTime = mergeEndTime - mergeStartTime;
700            logger.info("iterativeMergeDataFlows: iteration " + currentIteration + " completed, merged relation count: " + mergedRelationCount + ", merge time: " + mergeTime + "ms, total iteration time: " + iterationTime + "ms");
701            
702            currentDataflow = null;
703            tempList.clear();
704        }
705        
706        long endTime = System.currentTimeMillis();
707        long totalTime = endTime - startTime;
708        int finalRelationCount = mergedDataflow.getRelationships() != null ? mergedDataflow.getRelationships().size() : 0;
709        logger.info("iterativeMergeDataFlows: all iterations completed, final relation count: " + finalRelationCount + ", total time: " + totalTime + "ms (" + (totalTime / 1000.0) + "s)");
710        
711        return mergedDataflow;
712    }
713
714    private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index,
715                               SqlInfo sqlInfo, Map queryObject) {
716        EDbVendor vendor = option.getVendor();
717        if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){
718            vendor = EDbVendor.valueOf(sqlInfo.getDbVendor());
719        }
720
721        boolean supportCatalog = TSQLEnv.supportCatalog(vendor);
722        boolean supportSchema = TSQLEnv.supportSchema(vendor);
723
724        String content = (String) queryObject.get("sourceCode");
725        if (SQLUtil.isEmpty(content)) {
726            return;
727        }
728
729        StringBuilder builder = new StringBuilder();
730        if (supportCatalog) {
731            String database = (String) queryObject.get("database");
732            if (database.indexOf(".") != -1) {
733                String delimitedChar = TSQLEnv.delimitedChar(vendor);
734                database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar;
735            }
736            builder.append(database);
737        }
738        if (supportSchema) {
739            String schema = (String) queryObject.get("schema");
740            if (schema.indexOf(".") != -1) {
741                String delimitedChar = TSQLEnv.delimitedChar(vendor);
742                schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar;
743            }
744            if (builder.length() > 0) {
745                builder.append(".");
746            }
747            builder.append(schema);
748        }
749        String group = builder.toString();
750        String sqlHash = SHA256.getMd5(content);
751        String hash = SHA256.getMd5(sqlHash);
752        if (!databaseMap.containsKey(sqlHash)) {
753            databaseMap.put(sqlHash,
754                    new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group));
755        }
756        TGSqlParser parser = new TGSqlParser(option.getVendor());
757        String delimiterChar = String.valueOf(parser.getDelimiterChar());
758        StringBuilder buffer = new StringBuilder(content);
759        if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) {
760            buffer.append("\n");
761        } else if(vendor == EDbVendor.dbvredshift
762                || vendor == EDbVendor.dbvgaussdb
763                || vendor == EDbVendor.dbvpostgresql
764                || vendor == EDbVendor.dbvmysql
765                || vendor == EDbVendor.dbvteradata){
766            buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n");
767        } else{
768            SQLUtil.endTrim(buffer);
769            buffer.append(";").append("\n");
770        }
771
772        int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1;
773        if (databaseMap.get(sqlHash).first.toString().length() == 0) {
774            lineStart = 0;
775        }
776        databaseMap.get(sqlHash).first.append(buffer.toString());
777        SqlInfo sqlInfoItem = new SqlInfo();
778        sqlInfoItem.setFileName(sqlInfo.getFileName());
779        sqlInfoItem.setFilePath(sqlInfo.getFilePath());
780        sqlInfoItem.setSql(buffer.toString());
781        sqlInfoItem.setOriginIndex(index);
782        sqlInfoItem.setOriginLineStart(0);
783        sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1);
784        sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement());
785        sqlInfoItem.setLineStart(lineStart);
786        sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1);
787        sqlInfoItem.setGroup(group);
788        sqlInfoItem.setHash(hash);
789
790        if (!sqlInfoMap.containsKey(hash)) {
791            sqlInfoMap.put(hash, new ArrayList<SqlInfo>());
792        }
793        sqlInfoMap.get(hash).add(sqlInfoItem);
794    }
795
796    @Override
797    public void dispose() {
798        ModelBindingManager.remove();
799    }
800
801    private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) {
802        return mergeDataFlows(dataflowMap.values(), startId);
803    }
804
805    public static dataflow mergeDataFlows(Collection<dataflow> dataflows, EDbVendor vendor) {
806        ModelBindingManager.setGlobalVendor(vendor);
807        try {
808            return mergeDataFlows(dataflows, 5000000L * dataflows.size());
809        } finally {
810            ModelBindingManager.removeGlobalVendor();
811        }
812    }
813
814    private static dataflow mergeDataFlows(Collection<dataflow> dataflows, long startId) {
815        dataflow mergeDataflow = new dataflow();
816
817        List<table> tableCopy = new ArrayList<table>();
818        List<table> viewCopy = new ArrayList<table>();
819        List<table> databaseCopy = new ArrayList<table>();
820        List<table> schemaCopy = new ArrayList<table>();
821        List<table> stageCopy = new ArrayList<table>();
822        List<table> dataSourceCopy = new ArrayList<table>();
823        List<table> streamCopy = new ArrayList<table>();
824        List<table> fileCopy = new ArrayList<table>();
825        List<table> variableCopy = new ArrayList<table>();
826        List<table> resultSetCopy = new ArrayList<table>();
827
828        List<process> processCopy = new ArrayList<process>();
829        List<relationship> relationshipCopy = new ArrayList<relationship>();
830        List<procedure> procedureCopy = new ArrayList<procedure>();
831        List<oraclePackage> packageCopy = new ArrayList<oraclePackage>();
832        List<error> errorCopy = new ArrayList<error>();
833
834        List<table> tables = new ArrayList<table>();
835        for (dataflow dataflow : dataflows) {
836            if (dataflow == null) {
837                continue;
838            }
839            
840            if (dataflow.getTables() != null) {
841                tableCopy.addAll(dataflow.getTables());
842            }
843            mergeDataflow.setTables(tableCopy);
844            if (dataflow.getViews() != null) {
845                viewCopy.addAll(dataflow.getViews());
846            }
847            mergeDataflow.setViews(viewCopy);
848            if (dataflow.getDatabases() != null) {
849                databaseCopy.addAll(dataflow.getDatabases());
850            }
851            mergeDataflow.setDatabases(databaseCopy);
852            if (dataflow.getSchemas() != null) {
853                schemaCopy.addAll(dataflow.getSchemas());
854            }
855            mergeDataflow.setSchemas(schemaCopy);
856            if (dataflow.getStages() != null) {
857                stageCopy.addAll(dataflow.getStages());
858            }
859            mergeDataflow.setStages(stageCopy);
860            if (dataflow.getDatasources() != null) {
861                dataSourceCopy.addAll(dataflow.getDatasources());
862            }
863            mergeDataflow.setDatasources(dataSourceCopy);
864            if (dataflow.getStreams() != null) {
865                streamCopy.addAll(dataflow.getStreams());
866            }
867            mergeDataflow.setStreams(streamCopy);
868            if (dataflow.getPaths() != null) {
869                fileCopy.addAll(dataflow.getPaths());
870            }
871            mergeDataflow.setPaths(fileCopy);
872            if (dataflow.getVariables() != null) {
873                variableCopy.addAll(dataflow.getVariables());
874            }
875            mergeDataflow.setVariables(variableCopy);
876            if (dataflow.getResultsets() != null) {
877                resultSetCopy.addAll(dataflow.getResultsets());
878            }
879            mergeDataflow.setResultsets(resultSetCopy);
880            if (dataflow.getProcesses() != null) {
881                processCopy.addAll(dataflow.getProcesses());
882            }
883            mergeDataflow.setProcesses(processCopy);
884            if (dataflow.getRelationships() != null) {
885                relationshipCopy.addAll(dataflow.getRelationships());
886            }
887            mergeDataflow.setRelationships(relationshipCopy);
888            if (dataflow.getProcedures() != null) {
889                procedureCopy.addAll(dataflow.getProcedures());
890            }
891            mergeDataflow.setProcedures(procedureCopy);
892            if (dataflow.getPackages() != null) {
893                packageCopy.addAll(dataflow.getPackages());
894            }
895            mergeDataflow.setPackages(packageCopy);
896            if (dataflow.getErrors() != null) {
897                errorCopy.addAll(dataflow.getErrors());
898            }
899            if (errorCopy.size() > 10000) {
900                errorCopy = errorCopy.subList(0, 10000);
901            }
902            mergeDataflow.setErrors(errorCopy);
903
904            tables.addAll(dataflow.getTables());
905            tables.addAll(dataflow.getViews());
906            tables.addAll(dataflow.getDatabases());
907            tables.addAll(dataflow.getSchemas());
908            tables.addAll(dataflow.getStages());
909            tables.addAll(dataflow.getDatasources());
910            tables.addAll(dataflow.getStreams());
911            tables.addAll(dataflow.getPaths());
912            tables.addAll(dataflow.getResultsets());
913            tables.addAll(dataflow.getVariables());
914        }
915
916        Map<String, List<table>> tableMap = new HashMap<String, List<table>>();
917        Map<String, String> tableTypeMap = new HashMap<String, String>();
918        Map<String, String> tableIdMap = new HashMap<String, String>();
919
920        Map<String, List<column>> columnMap = new HashMap<String, List<column>>();
921        Map<String, Set<String>> tableColumnMap = new HashMap<String, Set<String>>();
922        Map<String, String> columnIdMap = new HashMap<String, String>();
923        Map<String, column> columnMergeIdMap = new HashMap<String, column>();
924
925        List<procedure> procedures = new ArrayList<>(mergeDataflow.getProcedures());
926        if(mergeDataflow.getPackages()!=null){
927            for(oraclePackage pkg : mergeDataflow.getPackages()){
928                procedures.addAll(pkg.getProcedures());
929            }
930        }
931
932        Set<String> procedureIdSet = procedures.stream().map(t->t.getId()).collect(Collectors.toSet());
933
934        for (table table : tables) {
935            String qualifiedTableName = DlineageUtil.getQualifiedTableName(table);
936            String tableFullName = DlineageUtil.getIdentifierNormalTableName(qualifiedTableName);
937
938            if (!tableMap.containsKey(tableFullName)) {
939                tableMap.put(tableFullName, new ArrayList<table>());
940            }
941
942            tableMap.get(tableFullName).add(table);
943
944            if (!tableTypeMap.containsKey(tableFullName)) {
945                tableTypeMap.put(tableFullName, table.getType());
946            } else if ("view".equals(table.getSubType())) {
947                tableTypeMap.put(tableFullName, table.getType());
948            } else if ("database".equals(table.getSubType())) {
949                tableTypeMap.put(tableFullName, table.getType());
950            } else if ("schema".equals(table.getSubType())) {
951                tableTypeMap.put(tableFullName, table.getType());
952            } else if ("stage".equals(table.getSubType())) {
953                tableTypeMap.put(tableFullName, table.getType());
954            } else if ("datasource".equals(table.getSubType())) {
955                tableTypeMap.put(tableFullName, table.getType());
956            } else if ("stream".equals(table.getSubType())) {
957                tableTypeMap.put(tableFullName, table.getType());
958            } else if ("file".equals(table.getSubType())) {
959                tableTypeMap.put(tableFullName, table.getType());
960            } else if ("table".equals(tableTypeMap.get(tableFullName))) {
961                tableTypeMap.put(tableFullName, table.getType());
962            } else if ("variable".equals(tableTypeMap.get(tableFullName))) {
963                tableTypeMap.put(tableFullName, table.getType());
964            }
965
966            if (table.getColumns() != null) {
967                if (!tableColumnMap.containsKey(tableFullName)) {
968                    tableColumnMap.put(tableFullName, new LinkedHashSet<String>());
969                }
970                for (column column : table.getColumns()) {
971                    String columnFullName = tableFullName + "."
972                            + DlineageUtil.getIdentifierNormalColumnName(column.getName());
973
974                    if (!columnMap.containsKey(columnFullName)) {
975                        columnMap.put(columnFullName, new ArrayList<column>());
976                        tableColumnMap.get(tableFullName).add(columnFullName);
977                    }
978
979                    columnMap.get(columnFullName).add(column);
980                }
981            }
982        }
983
984        Iterator<String> tableNameIter = tableMap.keySet().iterator();
985        while (tableNameIter.hasNext()) {
986            String tableName = tableNameIter.next();
987            List<table> tableList = tableMap.get(tableName);
988            table table;
989            if (tableList.size() > 1) {
990                table standardTable = tableList.get(0);
991                //Function允许重名,不做合并处理
992                if (standardTable.isFunction()) {
993                    continue;
994                }
995
996                String type = tableTypeMap.get(tableName);
997                table = new table();
998                table.setId(String.valueOf(++startId));
999                table.setServer(standardTable.getServer());
1000                table.setDatabase(standardTable.getDatabase());
1001                table.setSchema(standardTable.getSchema());
1002                table.setName(standardTable.getName());
1003                table.setDisplayName(standardTable.getDisplayName());
1004                table.setParent(standardTable.getParent());
1005                table.setColumns(new ArrayList<column>());
1006                Set<String> processIds = new LinkedHashSet<String>();
1007                for (int k = 0; k < tableList.size(); k++) {
1008                    if (tableList.get(k).getProcessIds() != null) {
1009                        processIds.addAll(tableList.get(k).getProcessIds());
1010                    }
1011                }
1012                if (!processIds.isEmpty()) {
1013                    table.setProcessIds(new ArrayList<String>(processIds));
1014                }
1015                table.setType(type);
1016                for (table item : tableList) {
1017                    if (!SQLUtil.isEmpty(table.getCoordinate()) && !SQLUtil.isEmpty(item.getCoordinate())) {
1018                        if (table.getCoordinate().indexOf(item.getCoordinate()) == -1) {
1019                            table.appendCoordinate(item.getCoordinate());
1020                        }
1021                    } else if (!SQLUtil.isEmpty(item.getCoordinate())) {
1022                        table.setCoordinate(item.getCoordinate());
1023                    }
1024
1025                    if (!SQLUtil.isEmpty(table.getAlias()) && !SQLUtil.isEmpty(item.getAlias())) {
1026                        table.setAlias(table.getAlias() + "," + item.getAlias());
1027                    } else if (!SQLUtil.isEmpty(item.getAlias())) {
1028                        table.setAlias(item.getAlias());
1029                    }
1030
1031                    tableIdMap.put(item.getId(), table.getId());
1032
1033                    if (item.isView()) {
1034                        mergeDataflow.getViews().remove(item);
1035                    } else if (item.isDatabaseType()) {
1036                        mergeDataflow.getDatabases().remove(item);
1037                    } else if (item.isSchemaType()) {
1038                        mergeDataflow.getSchemas().remove(item);
1039                    } else if (item.isStage()) {
1040                        mergeDataflow.getStages().remove(item);
1041                    } else if (item.isDataSource()) {
1042                        mergeDataflow.getDatasources().remove(item);
1043                    } else if (item.isStream()) {
1044                        mergeDataflow.getStreams().remove(item);
1045                    } else if (item.isFile()) {
1046                        mergeDataflow.getPaths().remove(item);
1047                    } else if (item.isVariable()) {
1048                        mergeDataflow.getVariables().remove(item);
1049                    } else if (item.isTable()) {
1050                        mergeDataflow.getTables().remove(item);
1051                    } else if (item.isResultSet()) {
1052                        mergeDataflow.getResultsets().remove(item);
1053                    }
1054                }
1055
1056                if (table.isView()) {
1057                    mergeDataflow.getViews().add(table);
1058                } else if (table.isDatabaseType()) {
1059                    mergeDataflow.getDatabases().add(table);
1060                } else if (table.isSchemaType()) {
1061                    mergeDataflow.getSchemas().add(table);
1062                } else if (table.isStage()) {
1063                    mergeDataflow.getStages().add(table);
1064                } else if (table.isDataSource()) {
1065                    mergeDataflow.getDatasources().add(table);
1066                } else if (table.isStream()) {
1067                    mergeDataflow.getStreams().add(table);
1068                } else if (table.isFile()) {
1069                    mergeDataflow.getPaths().add(table);
1070                } else if (table.isVariable()) {
1071                    mergeDataflow.getVariables().add(table);
1072                } else if (table.isResultSet()) {
1073                    mergeDataflow.getResultsets().add(table);
1074                } else {
1075                    mergeDataflow.getTables().add(table);
1076                }
1077            } else {
1078                table = tableList.get(0);
1079            }
1080
1081            Set<String> columns = tableColumnMap.get(tableName);
1082            Iterator<String> columnIter = columns.iterator();
1083            List<column> mergeColumns = new ArrayList<column>();
1084            while (columnIter.hasNext()) {
1085                String columnName = columnIter.next();
1086                List<column> columnList = columnMap.get(columnName);
1087                List<column> functions = new ArrayList<column>();
1088                for (column t : columnList) {
1089                    if (Boolean.TRUE.toString().equals(t.getIsFunction())) {
1090                        functions.add(t);
1091                    }
1092                }
1093                if (functions != null && !functions.isEmpty()) {
1094                    for (column function : functions) {
1095                        mergeColumns.add(function);
1096                        columnIdMap.put(function.getId(), function.getId());
1097                        columnMergeIdMap.put(function.getId(), function);
1098                    }
1099
1100                    columnList.removeAll(functions);
1101                }
1102                if (!columnList.isEmpty()) {
1103                    column firstColumn = columnList.iterator().next();
1104                    if (columnList.size() > 1) {
1105                        column mergeColumn = new column();
1106                        mergeColumn.setId(String.valueOf(++startId));
1107                        mergeColumn.setName(firstColumn.getName());
1108                        mergeColumn.setDisplayName(firstColumn.getDisplayName());
1109                        mergeColumn.setSource(firstColumn.getSource());
1110                        mergeColumn.setQualifiedTable(firstColumn.getQualifiedTable());
1111                        mergeColumns.add(mergeColumn);
1112                        for (column item : columnList) {
1113                            mergeColumn.appendCoordinate(item.getCoordinate());
1114                            columnIdMap.put(item.getId(), mergeColumn.getId());
1115                            //add by grq 2023.02.06 issue=I6DB5S
1116                            if(item.getDataType() != null){
1117                                mergeColumn.setDataType(item.getDataType());
1118                            }
1119                            if(item.isForeignKey() != null){
1120                                mergeColumn.setForeignKey(item.isForeignKey());
1121                            }
1122                            if(item.isUnqiueKey() != null){
1123                                mergeColumn.setUnqiueKey(item.isUnqiueKey());
1124                            }
1125                            if(item.isIndexKey() != null){
1126                                mergeColumn.setIndexKey(item.isIndexKey());
1127                            }
1128                            if(item.isPrimaryKey() != null){
1129                                mergeColumn.setPrimaryKey(item.isPrimaryKey());
1130                            }
1131                            //end by grq
1132                        }
1133                        columnMergeIdMap.put(mergeColumn.getId(), mergeColumn);
1134                    } else {
1135                        mergeColumns.add(firstColumn);
1136                        columnIdMap.put(firstColumn.getId(), firstColumn.getId());
1137                        columnMergeIdMap.put(firstColumn.getId(), firstColumn);
1138                    }
1139                }
1140            }
1141            table.setColumns(mergeColumns);
1142        }
1143
1144        if (mergeDataflow.getRelationships() != null) {
1145            Map<String, relationship> mergeRelations = new LinkedHashMap<String, relationship>();
1146            for (int i = 0; i < mergeDataflow.getRelationships().size(); i++) {
1147                relationship relation = mergeDataflow.getRelationships().get(i);
1148                if(RelationshipType.call.name().equals(relation.getType())){
1149                    targetColumn target = relation.getCaller();
1150                    if (target == null) {
1151                        continue;
1152                    }
1153                    if (target != null && tableIdMap.containsKey(target.getId())) {
1154                        target.setId(tableIdMap.get(target.getId()));
1155                    }
1156
1157                    List<sourceColumn> sources = relation.getCallees();
1158                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1159                    if (sources != null) {
1160                        for (sourceColumn source : sources) {
1161                            if (tableIdMap.containsKey(source.getId())) {
1162                                source.setId(tableIdMap.get(source.getId()));
1163                            }
1164                        }
1165                        sourceSet.addAll(sources);
1166                        relation.setCallees(new ArrayList<sourceColumn>(sourceSet));
1167                    }
1168
1169                    String jsonString = JSON.toJSONString(relation, true);
1170                    String key = SHA256.getMd5(jsonString);
1171                    if (!mergeRelations.containsKey(key)) {
1172                        mergeRelations.put(key, relation);
1173                    }
1174                }
1175                else {
1176                    targetColumn target = relation.getTarget();
1177                    if (target == null) {
1178                        continue;
1179                    }
1180                    if (target != null && tableIdMap.containsKey(target.getParent_id())) {
1181                        target.setParent_id(tableIdMap.get(target.getParent_id()));
1182                    }
1183
1184                    if (columnIdMap.containsKey(target.getId())) {
1185                        target.setId(columnIdMap.get(target.getId()));
1186                        target.setCoordinate(columnMergeIdMap.get(target.getId()).getCoordinate());
1187                    }
1188
1189                    List<sourceColumn> sources = relation.getSources();
1190                    Set<sourceColumn> sourceSet = new LinkedHashSet<sourceColumn>();
1191                    if (sources != null) {
1192                        for (sourceColumn source : sources) {
1193                            if (tableIdMap.containsKey(source.getParent_id())) {
1194                                source.setParent_id(tableIdMap.get(source.getParent_id()));
1195                            }
1196                            if (tableIdMap.containsKey(source.getSource_id())) {
1197                                source.setSource_id(tableIdMap.get(source.getSource_id()));
1198                            }
1199                            if (columnIdMap.containsKey(source.getId())) {
1200                                source.setId(columnIdMap.get(source.getId()));
1201                                source.setCoordinate(columnMergeIdMap.get(source.getId()).getCoordinate());
1202                            }
1203                        }
1204
1205                        sourceSet.addAll(sources);
1206                        relation.setSources(new ArrayList<sourceColumn>(sourceSet));
1207                    }
1208
1209                    String jsonString = JSON.toJSONString(relation, true);
1210                    String key = SHA256.getMd5(jsonString);
1211                    if (!mergeRelations.containsKey(key)) {
1212                        mergeRelations.put(key, relation);
1213                    }
1214                }
1215            }
1216
1217            mergeDataflow.setRelationships(new ArrayList<relationship>(mergeRelations.values()));
1218        }
1219
1220        tableMap.clear();
1221        tableTypeMap.clear();
1222        tableIdMap.clear();
1223        columnMap.clear();
1224        tableColumnMap.clear();
1225        columnIdMap.clear();
1226        columnMergeIdMap.clear();
1227        tables.clear();
1228
1229        return mergeDataflow;
1230    }
1231
1232    @Override
1233    public synchronized dataflow getDataFlow() {
1234        if (dataflow != null) {
1235            return dataflow;
1236        } else if (dataflowString != null) {
1237            return XML2Model.loadXML(dataflow.class, dataflowString);
1238        }
1239        return null;
1240    }
1241
1242    @Override
1243    public Map<String, String> getHashSQLMap() {
1244        return hashSQLMap;
1245    }
1246
1247    public static void main(String[] args) throws Exception {
1248        Option option = new Option();
1249        option.setVendor(EDbVendor.dbvmssql);
1250        option.setOutput(false);
1251//        option.setSimpleOutput(true);
1252        File parentDir = new File("C:\\Users\\KK\\Desktop\\sql");
1253        ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\metadata_with_query\\客户原始sql.json")}, option, 5, new File("C:\\Users\\KK\\Desktop\\metadata_with_query"));
1254        analyzer.generateDataFlow(false, true);
1255        dataflow dataflow = analyzer.getDataFlow();
1256//        System.out.println(XML2Model.saveXML(dataflow));
1257//        dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip"));
1258        XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip"));
1259//        dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow);
1260//        dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle);
1261//        System.out.println(XML2Model.saveXML(dataflow));
1262    }
1263}