001package gudusoft.gsqlparser.dlineage.util; 002 003import java.util.Comparator; 004import java.util.Set; 005import java.util.TreeSet; 006import java.util.concurrent.CountDownLatch; 007import java.util.concurrent.Executors; 008import java.util.concurrent.ThreadPoolExecutor; 009 010import gudusoft.gsqlparser.EDbVendor; 011import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer; 012import gudusoft.gsqlparser.dlineage.dataflow.model.Option; 013import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo; 014import gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow; 015import gudusoft.gsqlparser.dlineage.dataflow.sqlenv.SQLEnvParser; 016import gudusoft.gsqlparser.dlineage.util.TableFlowUtility.LeftMostSourceTable; 017import gudusoft.gsqlparser.sqlenv.TSQLEnv; 018import gudusoft.gsqlparser.util.Logger; 019import gudusoft.gsqlparser.util.LoggerFactory; 020 021public class LeftMostTableUtility { 022 private static final Logger logger = LoggerFactory.getLogger(LeftMostTableUtility.class); 023 public static String generateLeftMostTableCsv(Option option, SqlInfo[] sqlInfos, TSQLEnv sqlenv, final EDbVendor vendor) 024 throws InterruptedException { 025 return generateLeftMostTableCsv(option, sqlInfos, sqlenv, vendor, ","); 026 } 027 028 public static String generateLeftMostTableCsv(Option option, SqlInfo[] sqlInfos, TSQLEnv sqlenv, final EDbVendor vendor, 029 final String delimiter) throws InterruptedException { 030 if (sqlenv == null) { 031 TSQLEnv[] envs = new SQLEnvParser(option.getDefaultServer(), option.getDefaultDatabase(), option.getDefaultSchema()).parseSQLEnv(vendor, sqlInfos); 032 if (envs != null && envs.length > 0) { 033 sqlenv = envs[0]; 034 } 035 } 036 final TSQLEnv dataflowSqlEnv = sqlenv; 037 final TreeSet<String> lines = new TreeSet<String>(new Comparator<String>() { 038 @Override 039 public int compare(String o1, String o2) { 040 return o1.toLowerCase().compareTo(o2.toLowerCase()); 041 } 042 }); 043 final TreeSet<String> lowers = new TreeSet<String>(); 044 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 045 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 046 .newFixedThreadPool(Runtime.getRuntime().availableProcessors() < sqlInfos.length 047 ? Runtime.getRuntime().availableProcessors() 048 : sqlInfos.length); 049 for (SqlInfo item : sqlInfos) { 050 final SqlInfo sqlInfo = item; 051 Runnable task = new Runnable() { 052 @Override 053 public void run() { 054 try { 055 DataFlowAnalyzer analyzer = new DataFlowAnalyzer(new SqlInfo[] { sqlInfo }, vendor, true); 056 analyzer.setSqlEnv(dataflowSqlEnv); 057 analyzer.generateDataFlow(); 058 dataflow dataflow = TableFlowUtility.generateLeftMostTableFlow(analyzer, analyzer.getDataFlow(), 059 ".*", null, true); 060 LeftMostSourceTable leftMostSourceTable = TableFlowUtility.exportToLeftMostTableModel(dataflow); 061 Set<String> result = TableFlowUtility.exportModelToLines(leftMostSourceTable, delimiter); 062 synchronized (lowers) { 063 for (String line : result) { 064 String lower = line.toLowerCase(); 065 if (!lowers.contains(lower)) { 066 lowers.add(lower); 067 lines.add(line); 068 } 069 } 070 } 071 } finally { 072 latch.countDown(); 073 } 074 } 075 }; 076 executor.submit(task); 077 } 078 latch.await(); 079 executor.shutdown(); 080 StringBuilder buffer = new StringBuilder(); 081 buffer.append("SOURCE_TABLE" + delimiter + "SOURCE_COLUMN" + delimiter + "TARGET_TABLE" + delimiter 082 + "TARGET_COLUMN").append(System.getProperty("line.separator")); 083 try { 084 for (String line : lines) { 085 buffer.append(line).append(System.getProperty("line.separator")); 086 } 087 } catch (Exception e) { 088 logger.error("Generate column level csv failed.", e); 089 } 090 return buffer.toString(); 091 } 092}