001package gudusoft.gsqlparser.dlineage;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TGSqlParser;
006import gudusoft.gsqlparser.dlineage.dataflow.listener.DataFlowHandleListener;
007import gudusoft.gsqlparser.dlineage.dataflow.metadata.MetadataReader;
008import gudusoft.gsqlparser.dlineage.dataflow.model.*;
009import gudusoft.gsqlparser.dlineage.dataflow.model.json.Coordinate;
010import gudusoft.gsqlparser.dlineage.dataflow.model.xml.*;
011import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser;
012import gudusoft.gsqlparser.dlineage.util.*;
013import gudusoft.gsqlparser.sqlenv.TSQLEnv;
014import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
015import gudusoft.gsqlparser.util.IndexedLinkedHashMap;
016import gudusoft.gsqlparser.util.Logger;
017import gudusoft.gsqlparser.util.LoggerFactory;
018import gudusoft.gsqlparser.util.SQLUtil;
019import gudusoft.gsqlparser.util.json.JSON;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.*;
024import java.util.concurrent.*;
025import java.util.concurrent.atomic.AtomicInteger;
026import java.util.stream.Collectors;
027
028public class ParallelDataFlowAnalyzer implements IDataFlowAnalyzer {
029    private static final Logger logger = LoggerFactory.getLogger(ParallelDataFlowAnalyzer.class);
030    private SqlInfo[] sqlInfos;
031    private Option option = new Option();
032    private TSQLEnv sqlenv = null;
033    private dataflow dataflow;
034    private String dataflowString;
035    private List<ErrorInfo> errorInfos = new CopyOnWriteArrayList<ErrorInfo>();
036    private IndexedLinkedHashMap<String, List<SqlInfo>> sqlInfoMap = new IndexedLinkedHashMap<String, List<SqlInfo>>();
037    private final Map<String, String> hashSQLMap = new HashMap();
038
039    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, EDbVendor dbVendor, boolean simpleOutput) {
040        this.sqlInfos = sqlInfos;
041        option.setVendor(dbVendor);
042        option.setSimpleOutput(simpleOutput);
043        ModelBindingManager.setGlobalOption(option);
044    }
045
046    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput, String defaultServer, String defaultDatabase, String defaltSchema) {
047        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
048        for (int i = 0; i < sqlContents.length; i++) {
049            SqlInfo info = new SqlInfo();
050            info.setSql(sqlContents[i]);
051            info.setOriginIndex(0);
052            sqlInfos[i] = info;
053        }
054        option.setVendor(dbVendor);
055        option.setSimpleOutput(simpleOutput);
056                option.setDefaultServer(defaultServer);
057                option.setDefaultDatabase(defaultDatabase);
058                option.setDefaultSchema(defaltSchema);
059        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
060    }
061    
062    public ParallelDataFlowAnalyzer(String[] sqlContents, EDbVendor dbVendor, boolean simpleOutput) {
063        SqlInfo[] sqlInfos = new SqlInfo[sqlContents.length];
064        for (int i = 0; i < sqlContents.length; i++) {
065            SqlInfo info = new SqlInfo();
066            info.setSql(sqlContents[i]);
067            info.setOriginIndex(0);
068            sqlInfos[i] = info;
069        }
070        option.setVendor(dbVendor);
071        option.setSimpleOutput(simpleOutput);
072        this.sqlInfos = convertSQL(dbVendor, JSON.toJSONString(sqlInfos)).toArray(new SqlInfo[0]);
073    }
074
075    public ParallelDataFlowAnalyzer(SqlInfo[] sqlInfos, Option option) {
076        this.sqlInfos = sqlInfos;
077        this.option = option;
078        ModelBindingManager.setGlobalOption(option);
079    }
080
081    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option) {
082        SqlInfo[] sqlInfos = new SqlInfo[sqlFiles.length];
083        for (int i = 0; i < sqlFiles.length; i++) {
084            SqlInfo info = new SqlInfo();
085            info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
086            info.setFileName(sqlFiles[i].getName());
087            info.setFilePath(sqlFiles[i].getAbsolutePath());
088            info.setOriginIndex(0);
089            sqlInfos[i] = info;
090        }
091        this.sqlInfos = sqlInfos;
092        this.option = option;
093        ModelBindingManager.setGlobalOption(option);
094    }
095    
096    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, File splitDir) {
097        List<SqlInfo> sqlInfoList = new ArrayList<SqlInfo>();
098        for (int i = 0; i < sqlFiles.length; i++) {
099            try {
100                // Ensure split directory exists
101                if (!splitDir.exists() && !splitDir.mkdirs()) {
102                    throw new IOException("Failed to create split directory: " + splitDir.getAbsolutePath());
103                }
104                
105                // Split the file if it's large
106                List<File> splitFiles = gudusoft.gsqlparser.util.FileSplitter.splitFile(sqlFiles[i], splitDir, splitSizeMB, option.getVendor());
107                
108                if (splitFiles.isEmpty()) {
109                    // If no split files were created, process the original file
110                    SqlInfo info = new SqlInfo();
111                    info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
112                    info.setFileName(sqlFiles[i].getName());
113                    info.setFilePath(sqlFiles[i].getAbsolutePath());
114                    info.setOriginIndex(0);
115                    sqlInfoList.add(info);
116                } else {
117                    // Process each split file
118                    for (File splitFile : splitFiles) {
119                        SqlInfo info = new SqlInfo();
120                        info.setSql(SQLUtil.getFileContent(splitFile));
121                        info.setFileName(splitFile.getName());
122                        info.setFilePath(splitFile.getAbsolutePath());
123                        info.setOriginIndex(0);
124                        sqlInfoList.add(info);
125                    }
126                }
127            } catch (Exception e) {
128                sqlInfoList.clear();
129                logger.error("Error splitting file: " + sqlFiles[i].getAbsolutePath(), e);
130                // Fallback to original file if splitting fails
131                SqlInfo info = new SqlInfo();
132                info.setSql(SQLUtil.getFileContent(sqlFiles[i]));
133                info.setFileName(sqlFiles[i].getName());
134                info.setFilePath(sqlFiles[i].getAbsolutePath());
135                info.setOriginIndex(0);
136                sqlInfoList.add(info);
137            }
138        }
139        this.sqlInfos = sqlInfoList.toArray(new SqlInfo[0]);
140        this.option = option;
141        ModelBindingManager.setGlobalOption(option);
142    }
143    
144    public ParallelDataFlowAnalyzer(File[] sqlFiles, Option option, int splitSizeMB, String splitDirPath) {
145        this(sqlFiles, option, splitSizeMB, new File(splitDirPath));
146    }
147    
148    protected List<SqlInfo> convertSQL(EDbVendor vendor, String json) {
149                List<SqlInfo> sqlInfos = new ArrayList<SqlInfo>();
150                List sqlContents = (List) JSON.parseObject(json);
151                for (int j = 0; j < sqlContents.size(); j++) {
152                        Map sqlContent = (Map) sqlContents.get(j);
153                        String sql = (String) sqlContent.get("sql");
154                        String fileName = (String) sqlContent.get("fileName");
155                        String filePath = (String) sqlContent.get("filePath");
156                        if (sql != null && sql.trim().startsWith("{")) {
157                                if (sql.indexOf("createdBy") != -1) {
158                                        if (this.sqlenv == null) {
159                                                TSQLEnv[] sqlenvs = new TJSONSQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sql);
160                                                if (sqlenvs != null) {
161                                                        this.sqlenv = sqlenvs[0];
162                                                }
163                                        }
164                        if (sql.toLowerCase().indexOf("sqldep") != -1 || sql.toLowerCase().indexOf("grabit") != -1 ) {
165                                Map queryObject = (Map) JSON.parseObject(sql);
166                                        List querys = (List) queryObject.get("queries");
167                                        if (querys != null) {
168                                                for (int i = 0; i < querys.size(); i++) {
169                                                        Map object = (Map) querys.get(i);
170                                                        SqlInfo info = new SqlInfo();
171                                                        info.setSql(JSON.toJSONString(object));
172                                                        info.setFileName(fileName);
173                                                        info.setFilePath(filePath);
174                                                        info.setOriginIndex(i);
175                                                        sqlInfos.add(info);
176                                                }
177                                                queryObject.remove("queries");
178                                                SqlInfo info = new SqlInfo();
179                                                info.setSql(JSON.toJSONString(queryObject));
180                                                info.setFileName(fileName);
181                                                info.setFilePath(filePath);
182                                                info.setOriginIndex(querys.size());
183                                                sqlInfos.add(info);
184                                        } else {
185                                                SqlInfo info = new SqlInfo();
186                                                info.setSql(JSON.toJSONString(queryObject));
187                                                info.setFileName(fileName);
188                                                info.setFilePath(filePath);
189                                                info.setOriginIndex(0);
190                                                sqlInfos.add(info);
191                                        }
192                        }
193                        else if (sql.toLowerCase().indexOf("sqlflow") != -1) {
194                                                Map sqlflow = (Map) JSON.parseObject(sql);
195                                                List<Map> servers = (List<Map>) sqlflow.get("servers");
196                                                if (servers != null) {
197                                                        for (Map queryObject : servers) {
198                                                                String name = (String) queryObject.get("name");
199                                                                String dbVendor = (String) queryObject.get("dbVendor");
200                                                                List querys = (List) queryObject.get("queries");
201                                                                if (querys != null) {
202                                                                        for (int i = 0; i < querys.size(); i++) {
203                                                                                Map object = (Map) querys.get(i);
204                                                                                SqlInfo info = new SqlInfo();
205                                                                                info.setSql(JSON.toJSONString(object));
206                                                                                info.setFileName(fileName);
207                                                                                info.setFilePath(filePath);
208                                                                                info.setOriginIndex(i);
209                                                                                info.setDbVendor(dbVendor);
210                                                                                info.setServer(name);
211                                                                                sqlInfos.add(info);
212                                                                        }
213                                                                        queryObject.remove("queries");
214                                                                        SqlInfo info = new SqlInfo();
215                                                                        info.setSql(JSON.toJSONString(queryObject));
216                                                                        info.setFileName(fileName);
217                                                                        info.setFilePath(filePath);
218                                                                        info.setOriginIndex(querys.size());
219                                                                        info.setDbVendor(dbVendor);
220                                                                        info.setServer(filePath);
221                                                                        sqlInfos.add(info);
222                                                                } else {
223                                                                        SqlInfo info = new SqlInfo();
224                                                                        info.setSql(JSON.toJSONString(queryObject));
225                                                                        info.setFileName(fileName);
226                                                                        info.setFilePath(filePath);
227                                                                        info.setOriginIndex(0);
228                                                                        sqlInfos.add(info);
229                                                                }
230                                                        }
231                                                }
232                                        }
233                    }
234                        } else if (sql != null) {
235                                SqlInfo info = new SqlInfo();
236                                info.setSql(sql);
237                                info.setFileName(fileName);
238                                info.setFilePath(filePath);
239                                info.setOriginIndex(0);
240                                sqlInfos.add(info);
241                        }
242                }
243                return sqlInfos;
244        }
245
246    @Override
247    public boolean isIgnoreRecordSet() {
248        return option.isIgnoreRecordSet();
249    }
250
251    @Override
252    public void setIgnoreRecordSet(boolean ignoreRecordSet) {
253        option.setIgnoreRecordSet(ignoreRecordSet);
254    }
255
256    @Override
257    public boolean isSimpleShowTopSelectResultSet() {
258        return option.isSimpleShowTopSelectResultSet();
259    }
260
261    @Override
262    public void setSimpleShowTopSelectResultSet(boolean simpleShowTopSelectResultSet) {
263        option.setSimpleShowTopSelectResultSet(simpleShowTopSelectResultSet);
264    }
265
266    @Override
267    public boolean isSimpleShowFunction() {
268        return option.isSimpleShowFunction();
269    }
270
271    @Override
272    public void setSimpleShowFunction(boolean simpleShowFunction) {
273        option.setSimpleShowFunction(simpleShowFunction);
274    }
275
276    @Override
277    public boolean isShowJoin() {
278        return option.isShowJoin();
279    }
280
281    @Override
282    public void setShowJoin(boolean showJoin) {
283        option.setShowJoin(showJoin);
284    }
285
286    @Override
287    public boolean isShowImplicitSchema() {
288        return option.isShowImplicitSchema();
289    }
290
291    @Override
292    public void setShowImplicitSchema(boolean showImplicitSchema) {
293        option.setShowImplicitSchema(showImplicitSchema);
294    }
295
296    @Override
297    public boolean isShowConstantTable() {
298        return option.isShowConstantTable();
299    }
300
301    @Override
302    public void setShowConstantTable(boolean showConstantTable) {
303        option.setShowConstantTable(showConstantTable);
304    }
305
306    @Override
307    public boolean isShowCountTableColumn() {
308        return option.isShowCountTableColumn();
309    }
310
311    @Override
312    public void setShowCountTableColumn(boolean showCountTableColumn) {
313        option.setShowCountTableColumn(showCountTableColumn);
314    }
315
316    @Override
317    public boolean isTransform() {
318        return option.isTransform();
319    }
320
321    @Override
322    public void setTransform(boolean transform) {
323        option.setTransform(transform);
324        if (option.isTransformCoordinate()) {
325            option.setTransform(true);
326        }
327    }
328
329    @Override
330    public boolean isTransformCoordinate() {
331        return option.isTransformCoordinate();
332    }
333
334    @Override
335    public void setTransformCoordinate(boolean transformCoordinate) {
336        option.setTransformCoordinate(transformCoordinate);
337        if (transformCoordinate) {
338            option.setTransform(true);
339        }
340    }
341
342    @Override
343    public boolean isLinkOrphanColumnToFirstTable() {
344        return option.isLinkOrphanColumnToFirstTable();
345    }
346
347    @Override
348    public void setLinkOrphanColumnToFirstTable(boolean linkOrphanColumnToFirstTable) {
349        option.setLinkOrphanColumnToFirstTable(linkOrphanColumnToFirstTable);
350    }
351
352    @Override
353    public boolean isIgnoreCoordinate() {
354        return option.isIgnoreCoordinate();
355    }
356
357    @Override
358    public void setIgnoreCoordinate(boolean ignoreCoordinate) {
359        option.setIgnoreCoordinate(ignoreCoordinate);
360    }
361
362    @Override
363    public void setHandleListener(DataFlowHandleListener listener) {
364        option.setHandleListener(listener);
365    }
366
367    @Override
368    public void setSqlEnv(TSQLEnv sqlenv) {
369        this.sqlenv = sqlenv;
370    }
371
372    @Override
373    public void setOption(Option option) {
374        this.option = option;
375    }
376
377    @Override
378    public Option getOption() {
379        return option;
380    }
381
382    @Override
383    public List<ErrorInfo> getErrorMessages() {
384        return errorInfos;
385    }
386
387    @Override
388    public synchronized String generateSqlInfos() {
389        return JSON.toJSONString(sqlInfoMap);
390    }
391
392    @Override
393    public synchronized String generateDataFlow() {
394        return generateDataFlow(false);
395    }
396
397    @Override
398    public Map<String, List<SqlInfo>> getSqlInfos() {
399        return sqlInfoMap;
400    }
401
402    @Override
403    /**
404     * @deprecated please use SqlInfoHelper.getSelectedDbObjectInfo
405     */
406    public DbObjectPosition getSelectedDbObjectInfo(Coordinate start, Coordinate end) {
407        if (start == null || end == null) {
408            throw new IllegalArgumentException("Coordinate can't be null.");
409        }
410
411        String hashCode = start.getHashCode();
412
413        if (hashCode == null) {
414            throw new IllegalArgumentException("Coordinate hashcode can't be null.");
415        }
416
417        int dbObjectStartLine = (int) start.getX() - 1;
418        int dbObjectStarColumn = (int) start.getY() - 1;
419        int dbObjectEndLine = (int) end.getX() - 1;
420        int dbObjectEndColumn = (int) end.getY() - 1;
421        List<SqlInfo> sqlInfoList;
422        if (hashCode.matches("\\d+")) {
423            sqlInfoList = sqlInfoMap.getValueAtIndex(Integer.valueOf(hashCode));
424        } else {
425            sqlInfoList = sqlInfoMap.get(hashCode);
426        }
427        for (int j = 0; j < sqlInfoList.size(); j++) {
428            SqlInfo sqlInfo = sqlInfoList.get(j);
429            int startLine = sqlInfo.getLineStart();
430            int endLine = sqlInfo.getLineEnd();
431            if (dbObjectStartLine >= startLine && dbObjectStartLine <= endLine) {
432                DbObjectPosition position = new DbObjectPosition();
433                position.setFile(sqlInfo.getFileName());
434                position.setFilePath(sqlInfo.getFilePath());
435                position.setSql(sqlInfo.getSql());
436                position.setIndex(sqlInfo.getOriginIndex());
437                List<Pair<Integer, Integer>> positions = position.getPositions();
438                positions.add(new Pair<Integer, Integer>(
439                        dbObjectStartLine - startLine + sqlInfo.getOriginLineStart() + 1, dbObjectStarColumn + 1));
440                positions.add(new Pair<Integer, Integer>(dbObjectEndLine - startLine + sqlInfo.getOriginLineStart() + 1,
441                        dbObjectEndColumn + 1));
442                return position;
443            }
444        }
445        return null;
446    }
447
448    @Override
449    public synchronized String generateDataFlow(final boolean withExtraInfo) {
450        return generateDataFlow(withExtraInfo, true);
451    }
452    
453    public synchronized String generateDataFlow(final boolean withExtraInfo, boolean useSaveMemoryMode) {
454        sqlInfoMap.clear();
455        errorInfos.clear();
456        Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap = new LinkedHashMap<String, Pair3<StringBuilder, AtomicInteger, String>>();
457        for (int i = 0; i < sqlInfos.length; i++) {
458            SqlInfo sqlInfo = sqlInfos[i];
459                        if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFilePath() != null) {
460                                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFilePath()));
461                        }
462            if (sqlInfo != null && sqlInfo.getSql() == null && sqlInfo.getFileName() != null) {
463                sqlInfo.setSql(SQLUtil.getFileContent(sqlInfo.getFileName()));
464            }
465            if (sqlInfo == null || sqlInfo.getSql() == null) {
466                sqlInfoMap.put(String.valueOf(i), new ArrayList<SqlInfo>());
467                continue;
468            }
469            String sql = sqlInfo.getSql();
470            if (sql != null && sql.trim().startsWith("{")) {
471                if (MetadataReader.isGrabit(sql) || MetadataReader.isSqlflow(sql)) {
472                    String hash = SHA256.getMd5(sql);
473                    String fileHash = SHA256.getMd5(hash);
474                    if (!sqlInfoMap.containsKey(fileHash)) {
475                        sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
476                        sqlInfoMap.get(fileHash).add(sqlInfo);
477                    }
478                } else {
479                    Map queryObject = (Map) JSON.parseObject(sql);
480                    appendSqlInfo(databaseMap, i, sqlInfo, queryObject);
481                }
482            } else {
483                String content = sql;
484                String hash = SHA256.getMd5(content);
485                String fileHash = SHA256.getMd5(hash);
486                if (!sqlInfoMap.containsKey(fileHash)) {
487                    sqlInfoMap.put(fileHash, new ArrayList<SqlInfo>());
488
489                    String database = TSQLEnv.DEFAULT_DB_NAME;
490                    String schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
491                    if(sqlenv!=null) {
492                        database = sqlenv.getDefaultCatalogName();
493                        if(database == null) {
494                            database = TSQLEnv.DEFAULT_DB_NAME;
495                        }
496                        schema = sqlenv.getDefaultSchemaName();
497                        if(schema == null) {
498                            schema = TSQLEnv.DEFAULT_SCHEMA_NAME;
499                        }
500                    }
501                    boolean supportCatalog = TSQLEnv.supportCatalog(option.getVendor());
502                    boolean supportSchema = TSQLEnv.supportSchema(option.getVendor());
503                    StringBuilder builder = new StringBuilder();
504                    if (supportCatalog) {
505                        builder.append(database);
506                    }
507                    if (supportSchema) {
508                        if (builder.length() > 0) {
509                            builder.append(".");
510                        }
511                        builder.append(schema);
512                    }
513                    String group = builder.toString();
514                    SqlInfo sqlInfoItem = new SqlInfo();
515                    sqlInfoItem.setFileName(sqlInfo.getFileName());
516                    sqlInfoItem.setFilePath(sqlInfo.getFilePath());
517                    sqlInfoItem.setSql(sqlInfo.getSql());
518                    sqlInfoItem.setOriginIndex(0);
519                    sqlInfoItem.setOriginLineStart(0);
520                    sqlInfoItem.setOriginLineEnd(sqlInfo.getSql().split("\n").length - 1);
521                    sqlInfoItem.setIndex(0);
522                    sqlInfoItem.setLineStart(0);
523                    sqlInfoItem.setLineEnd(sqlInfo.getSql().split("\n").length - 1);
524                    sqlInfoItem.setHash(fileHash);
525                    sqlInfoItem.setGroup(group);
526
527                    sqlInfoMap.get(fileHash).add(sqlInfoItem);
528                }
529            }
530        }
531
532        final TSQLEnv[] env = new TSQLEnv[]{sqlenv};
533        if (sqlenv == null) {
534                TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(option.getVendor(), sqlInfos);
535                        if (envs != null && envs.length > 0) {
536                                env[0] = envs[0];
537                        }
538        }
539        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
540                .newFixedThreadPool(option.getParallel() < sqlInfos.length
541                        ? option.getParallel()
542                        : sqlInfos.length);
543        
544        List<File> tempFiles = Collections.synchronizedList(new ArrayList<File>());
545        final Map<SqlInfo, dataflow> dataflowMap = useSaveMemoryMode ? null : new ConcurrentHashMap<>();
546        
547        try {
548            final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
549
550            logger.info("start parallel analyze " + sqlInfos.length + " sqlinfos");
551
552            for (int i = 0; i < sqlInfos.length; i++) {
553                final SqlInfo[] sqlInfoCopy = new SqlInfo[sqlInfos.length];
554                final SqlInfo item = sqlInfos[i];
555                sqlInfoCopy[i] = item;
556                final Option optionCopy = (Option) option.clone();
557                optionCopy.setStartId(5000000L * i);
558                optionCopy.setOutput(false);
559                final int index = i;
560                Runnable task = new Runnable() {
561                    @Override
562                    public void run() {
563                        try {
564                            logger.info("start analyze sqlinfo[" + index + "]" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : ""));
565                            DataFlowAnalyzer analyzer = new DataFlowAnalyzer(sqlInfoCopy, optionCopy);
566                            analyzer.setSqlEnv(env[0]);
567                            analyzer.generateDataFlow(withExtraInfo);
568                            dataflow dataflow = analyzer.getDataFlow();
569                            if (dataflow == null) {
570                                logger.warn("analyze sqlinfo[" + index + "] done, but dataflow is null" + (sqlInfos[index].getFilePath() != null ? ", file name = " + new File(sqlInfos[index].getFilePath()).getName() : ""));
571                                errorInfos.addAll(analyzer.getErrorMessages());
572                                analyzer.dispose();
573                                return;
574                            }
575                            logger.info("analyze sqlinfo[" + index + "] done, relation count: " + dataflow.getRelationships().size()+", error count: "+ analyzer.getErrorMessages().size());
576                            if (analyzer.getErrorMessages().size() > 10000) {
577                                dataflow.setErrors(new ArrayList<>(dataflow.getErrors().subList(0, 10000)));
578                                errorInfos.addAll(analyzer.getErrorMessages().subList(0, 10000));
579                                logger.warn("Too many errors in dataflow ("+dataflow.getErrors().size()+"), truncating to first 10000 errors" + (sqlInfos[index].getFileName() != null ? ", file name = " + sqlInfos[index].getFileName() : ""));
580                            }
581                            else{
582                                errorInfos.addAll(analyzer.getErrorMessages());
583                            }
584                            
585                            if (useSaveMemoryMode) {
586                                File tempFile = File.createTempFile("dataflow_" + index + "_" + System.currentTimeMillis() + "_", ".xml.zip");
587                                XML2Model.saveXML(dataflow, tempFile);
588                                tempFiles.add(tempFile);
589                            } else {
590                                dataflowMap.put(item, dataflow);
591                            }
592                            
593                            hashSQLMap.putAll(analyzer.getHashSQLMap());
594                            analyzer.dispose();
595                        }
596                        catch (Exception e) {
597                            logger.error("analyze sqlinfo[" + index + "] failed.", e);
598                        }
599                        finally {
600                            latch.countDown();
601                        }
602                    }
603                };
604                executor.submit(task);
605            }
606            latch.await();
607        } catch (Exception e) {
608            logger.error("execute task failed.", e);
609        }
610        executor.shutdown();
611
612        ModelBindingManager modelManager = new ModelBindingManager();
613        modelManager.setGlobalVendor(option.getVendor());
614        modelManager.setGlobalOption(option);
615        ModelBindingManager.set(modelManager);
616        
617        if (useSaveMemoryMode) {
618            // 使用节约内存模式,迭代合并
619            logger.info("start merge dataflow, dataflow count: " + tempFiles.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
620            this.dataflow = iterativeMergeDataFlows(tempFiles, 5000000L * sqlInfos.length);
621            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
622                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
623            }
624            for (File tempFile : tempFiles) {
625                tempFile.delete();
626            }
627        } else {
628            // 保持原有逻辑
629            logger.info("start merge dataflow, dataflow count: " + dataflowMap.size()+", useSaveMemoryMode: "+ useSaveMemoryMode+", gsp version: "+ TBaseType.versionid);
630            this.dataflow = mergeDataFlows(dataflowMap, 5000000L * sqlInfos.length);
631            if (this.dataflow != null && this.dataflow.getRelationships() != null) {
632                logger.info("merge dataflow done, dataflow count: " + tempFiles.size() + ", relation count: " + this.dataflow.getRelationships().size());
633            }
634            dataflowMap.clear();
635        }
636        
637        if (this.dataflow != null) {
638            logger.info("merge done, relation count: " + this.dataflow.getRelationships().size());
639        }
640        
641        if (dataflow != null && option.isOutput()) {
642            if (option.isTextFormat()) {
643                dataflowString = DataFlowAnalyzer.getTextOutput(dataflow);
644            } else {
645                try {
646                    dataflowString = XML2Model.saveXML(dataflow);
647                }catch (Exception e){
648                    logger.error("save dataflow as xml failed.", e);
649                    dataflowString = null;
650                }
651            }
652        }
653        ModelBindingManager.remove();
654        return dataflowString;
655    }
656    
657    private dataflow iterativeMergeDataFlows(List<File> tempFiles, long startId) {
658        return DataflowUtility.iterativeMergeDataflowsFromFilesByStartId(tempFiles, startId);
659    }
660
661    private void appendSqlInfo(Map<String, Pair3<StringBuilder, AtomicInteger, String>> databaseMap, int index,
662                               SqlInfo sqlInfo, Map queryObject) {
663        EDbVendor vendor = option.getVendor();
664        if(!SQLUtil.isEmpty(sqlInfo.getDbVendor())){
665            vendor = EDbVendor.valueOf(sqlInfo.getDbVendor());
666        }
667
668        boolean supportCatalog = TSQLEnv.supportCatalog(vendor);
669        boolean supportSchema = TSQLEnv.supportSchema(vendor);
670
671        String groupName = (String) queryObject.get("groupName");
672        if (DlineageUtil.isProcedureExcluded(groupName)) {
673            return;
674        }
675
676        String content = (String) queryObject.get("sourceCode");
677        if (SQLUtil.isEmpty(content)) {
678            return;
679        }
680
681        StringBuilder builder = new StringBuilder();
682        if (supportCatalog) {
683            String database = (String) queryObject.get("database");
684            if (database.indexOf(".") != -1) {
685                String delimitedChar = TSQLEnv.delimitedChar(vendor);
686                database = delimitedChar + SQLUtil.trimColumnStringQuote(database) + delimitedChar;
687            }
688            builder.append(database);
689        }
690        if (supportSchema) {
691            String schema = (String) queryObject.get("schema");
692            if (schema.indexOf(".") != -1) {
693                String delimitedChar = TSQLEnv.delimitedChar(vendor);
694                schema = delimitedChar + SQLUtil.trimColumnStringQuote(schema) + delimitedChar;
695            }
696            if (builder.length() > 0) {
697                builder.append(".");
698            }
699            builder.append(schema);
700        }
701        String group = builder.toString();
702        String sqlHash = SHA256.getMd5(content);
703        String hash = SHA256.getMd5(sqlHash);
704        if (!databaseMap.containsKey(sqlHash)) {
705            databaseMap.put(sqlHash,
706                    new Pair3<StringBuilder, AtomicInteger, String>(new StringBuilder(), new AtomicInteger(), group));
707        }
708        TGSqlParser parser = new TGSqlParser(option.getVendor());
709        String delimiterChar = String.valueOf(parser.getDelimiterChar());
710        StringBuilder buffer = new StringBuilder(content);
711        if (content.trim().endsWith(delimiterChar) || content.trim().endsWith(";")) {
712            buffer.append("\n");
713        } else if(vendor == EDbVendor.dbvredshift
714                || vendor == EDbVendor.dbvgaussdb
715                || vendor == EDbVendor.dbvedb
716                || vendor == EDbVendor.dbvpostgresql
717                || vendor == EDbVendor.dbvmysql
718                || vendor == EDbVendor.dbvteradata){
719            buffer.append("\n\n-- " + TBaseType.sqlflow_stmt_delimiter_str + "\n\n");
720        } else{
721            SQLUtil.endTrim(buffer);
722            buffer.append(";").append("\n");
723        }
724
725        int lineStart = databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1;
726        if (databaseMap.get(sqlHash).first.toString().length() == 0) {
727            lineStart = 0;
728        }
729        databaseMap.get(sqlHash).first.append(buffer.toString());
730        SqlInfo sqlInfoItem = new SqlInfo();
731        sqlInfoItem.setFileName(sqlInfo.getFileName());
732        sqlInfoItem.setFilePath(sqlInfo.getFilePath());
733        sqlInfoItem.setSql(buffer.toString());
734        sqlInfoItem.setOriginIndex(index);
735        sqlInfoItem.setOriginLineStart(0);
736        sqlInfoItem.setOriginLineEnd(buffer.toString().split("\n", -1).length - 1);
737        sqlInfoItem.setIndex(databaseMap.get(sqlHash).second.getAndIncrement());
738        sqlInfoItem.setLineStart(lineStart);
739        sqlInfoItem.setLineEnd(databaseMap.get(sqlHash).first.toString().split("\n", -1).length - 1);
740        sqlInfoItem.setGroup(group);
741        sqlInfoItem.setHash(hash);
742
743        if (!sqlInfoMap.containsKey(hash)) {
744            sqlInfoMap.put(hash, new ArrayList<SqlInfo>());
745        }
746        sqlInfoMap.get(hash).add(sqlInfoItem);
747    }
748
749    @Override
750    public void dispose() {
751        ModelBindingManager.remove();
752    }
753
754    private dataflow mergeDataFlows(Map<SqlInfo, dataflow> dataflowMap, long startId) {
755        return DataflowUtility.mergeDataflowsByStartId(dataflowMap.values(), startId);
756    }
757
758    @Override
759    public synchronized dataflow getDataFlow() {
760        if (dataflow != null) {
761            return dataflow;
762        } else if (dataflowString != null) {
763            return XML2Model.loadXML(dataflow.class, dataflowString);
764        }
765        return null;
766    }
767
768    @Override
769    public Map<String, String> getHashSQLMap() {
770        return hashSQLMap;
771    }
772
773    public static void main(String[] args) throws Exception {
774        Option option = new Option();
775        option.setVendor(EDbVendor.dbvmssql);
776        option.setOutput(false);
777//        option.setSimpleOutput(true);
778        File parentDir = new File("C:\\Users\\KK\\Desktop\\sql");
779        ParallelDataFlowAnalyzer analyzer = new ParallelDataFlowAnalyzer(new File[]{new File("C:\\Users\\KK\\Desktop\\metadata_with_query\\客户原始sql.json")}, option, 5, new File("C:\\Users\\KK\\Desktop\\metadata_with_query"));
780        analyzer.generateDataFlow(false, true);
781        dataflow dataflow = analyzer.getDataFlow();
782//        System.out.println(XML2Model.saveXML(dataflow));
783//        dataflow dataflow = XML2Model.loadXML(gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow.class,new File("D:\\dataflow.xml.zip"));
784        XML2Model.saveXML(dataflow, new File("D:\\dataflow.xml.zip"));
785//        dataflow = DataflowUtility.convertToTableLevelDataflow(dataflow);
786//        dataflow = DataflowUtility.convertTableLevelToFunctionCallDataflow(dataflow, true, EDbVendor.dbvoracle);
787//        System.out.println(XML2Model.saveXML(dataflow));
788    }
789
790}