001package gudusoft.gsqlparser.dlineage.dataflow.sqlenv; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo; 005import gudusoft.gsqlparser.dlineage.metadata.Coordinate; 006import gudusoft.gsqlparser.sqlenv.*; 007import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 008import gudusoft.gsqlparser.sqlenv.parser.TSQLEnvParser; 009import gudusoft.gsqlparser.util.Logger; 010import gudusoft.gsqlparser.util.LoggerFactory; 011import gudusoft.gsqlparser.util.SQLUtil; 012 013import java.io.File; 014import java.util.*; 015import java.util.concurrent.CopyOnWriteArrayList; 016import java.util.concurrent.CountDownLatch; 017import java.util.concurrent.Executors; 018import java.util.concurrent.ThreadPoolExecutor; 019 020public class SQLEnvParser implements TSQLEnvParser { 021 022 private static final Logger logger = LoggerFactory.getLogger(Coordinate.class); 023 024 private TSQLEnv metadataSQLEnv; 025 026 private String defaultServer; 027 private String defaultDatabase; 028 private String defaultSchema; 029 private List<SqlInfo> metadataInfos = new CopyOnWriteArrayList<SqlInfo>(); 030 031 public SQLEnvParser(TSQLEnv metadataSQLEnv, String defaultServer, String defaultDatabase, String defaultSchema) { 032 this.metadataSQLEnv = metadataSQLEnv; 033 this.defaultServer = defaultServer; 034 this.defaultDatabase = defaultDatabase; 035 this.defaultSchema = defaultSchema; 036 } 037 038 public SQLEnvParser(String defaultServer, String defaultDatabase, String defaultSchema) { 039 this.metadataSQLEnv = null; 040 this.defaultServer = defaultServer; 041 this.defaultDatabase = defaultDatabase; 042 this.defaultSchema = defaultSchema; 043 } 044 045 public TSQLEnv[] parseSQLEnv(final EDbVendor vendor, SqlInfo[] sqlInfos) { 046 if (sqlInfos == null || sqlInfos.length == 0) { 047 return null; 048 } 049 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 050 final List<TSQLEnv> sqlenvs = new ArrayList<TSQLEnv>(); 051 int thread = Runtime.getRuntime().availableProcessors() / 4 + 1; 052 if (thread < 4) thread = 4; 053 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 054 .newFixedThreadPool(thread < sqlInfos.length 055 ? thread 056 : sqlInfos.length); 057 for (int i = 0; i < sqlInfos.length; i++) { 058 final SqlInfo sqlInfo = sqlInfos[i]; 059 Runnable task = new Runnable() { 060 @Override 061 public void run() { 062 try { 063 if (sqlInfo == null) { 064 return; 065 } 066 String sql = sqlInfo.getSql(); 067 if (SQLUtil.isEmpty(sql) && sqlInfo.getFileName() != null 068 && new File(sqlInfo.getFileName()).exists()) { 069 sql = SQLUtil.getFileContent(sqlInfo.getFileName()); 070 } 071 if (SQLUtil.isEmpty(sql) && sqlInfo.getFilePath() != null 072 && new File(sqlInfo.getFilePath()).exists()) { 073 sql = SQLUtil.getFileContent(sqlInfo.getFilePath()); 074 } 075 TJSONSQLEnvParser jsonSQLEnvParser = new TJSONSQLEnvParser(defaultServer, defaultDatabase, defaultSchema); 076 TSQLEnv[] sqlenv = jsonSQLEnvParser.parseSQLEnv(vendor, sql); 077 if (sqlenv == null) { 078 sqlenv = parseSQLEnv(vendor, sql); 079 } 080 else { 081 metadataInfos.add(sqlInfo); 082 } 083 synchronized (sqlenvs) { 084 if (sqlenv != null) { 085 sqlenvs.addAll(Arrays.asList(sqlenv)); 086 } 087 } 088 } catch (Exception e) { 089 logger.error(e.getMessage(), e); 090 } finally { 091 latch.countDown(); 092 } 093 } 094 }; 095 executor.submit(task); 096 } 097 try { 098 latch.await(); 099 } catch (Exception e) { 100 logger.error("await latch occurs an exception.", e); 101 } 102 executor.shutdown(); 103 Map<String, List<TSQLEnv>> sqlenvMap = new HashMap<String, List<TSQLEnv>>(); 104 for (TSQLEnv sqlenv : sqlenvs) { 105 String serverName = sqlenv.getDefaultServerName(); 106 if(serverName == null) { 107 serverName= TSQLEnv.DEFAULT_SERVER_NAME; 108 } 109 if(!sqlenvMap.containsKey(serverName)) { 110 sqlenvMap.put(serverName, new ArrayList<TSQLEnv>()); 111 } 112 sqlenvMap.get(serverName).add(sqlenv); 113 } 114 115 List<TSQLEnv> mergeSQLEnvs = new ArrayList<TSQLEnv>(); 116 for(String key: sqlenvMap.keySet()) { 117 List<TSQLEnv> sqlenvItem = sqlenvMap.get(key); 118 mergeSQLEnvs.add(mergeSQLEnv(sqlenvItem)); 119 } 120 return mergeSQLEnvs.toArray(new TSQLEnv[0]); 121 } 122 123 public static TSQLEnv mergeSQLEnv(List<TSQLEnv> sqlenvs) { 124 if (sqlenvs.size() == 0) 125 return null; 126 TSQLEnv sqlenv = sqlenvs.get(0); 127 for (int i = 1; i < sqlenvs.size(); i++) { 128 TSQLEnv temp = sqlenvs.get(i); 129 if(SQLUtil.isEmpty(sqlenv.getDefaultServerName()) || sqlenv.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 130 if(!SQLUtil.isEmpty(temp.getDefaultServerName()) && !temp.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 131 sqlenv.setDefaultServerName(temp.getDefaultServerName()); 132 } 133 } 134 if (temp.getCatalogList() != null) { 135 for (int j = 0; j < temp.getCatalogList().size(); j++) { 136 mergeCatalog(sqlenv, temp.getCatalogList().get(j)); 137 } 138 } 139 } 140// if (sqlenv.getCatalogList() != null && sqlenv.getCatalogList().size() == 1) { 141// sqlenv.setDefaultCatalogName(sqlenv.getCatalogList().get(0).getName()); 142// if (sqlenv.getCatalogList().get(0).getSchemaList() != null 143// && sqlenv.getCatalogList().get(0).getSchemaList().size() == 1) { 144// sqlenv.setDefaultSchemaName(sqlenv.getCatalogList().get(0).getSchemaList().get(0).getName()); 145// } 146// } 147 return sqlenv; 148 } 149 150 private static void mergeCatalog(TSQLEnv sqlenv, TSQLCatalog catalog) { 151 TSQLCatalog mergeCatalog = sqlenv.getSQLCatalog(catalog.getName(), true); 152 List<TSQLSchema> schemaList = catalog.getSchemaList(); 153 if (schemaList != null) { 154 for (int i = 0; i < schemaList.size(); i++) { 155 mergeSchema(mergeCatalog, schemaList.get(i)); 156 } 157 } 158 } 159 160 private static void mergeSchema(TSQLCatalog catalog, TSQLSchema schema) { 161 TSQLSchema mergeSchema = catalog.getSchema(schema.getName(), true); 162 List<TSQLSchemaObject> schemaObjectList = schema.getSchemaObjectList(); 163 if (schemaObjectList != null) { 164 for (int i = 0; i < schemaObjectList.size(); i++) { 165 mergeSchemaObject(mergeSchema, schemaObjectList.get(i)); 166 } 167 } 168 } 169 170 private static void mergeSchemaObject(TSQLSchema schema, TSQLSchemaObject schemaObject) { 171 if (schemaObject instanceof TSQLTable) { 172 TSQLTable sqlTable = (TSQLTable) schemaObject; 173 TSQLTable mergeTable = schema.getSqlEnv().searchTable(schemaObject.getQualifiedName()); 174 if (mergeTable == null) { 175 mergeTable = schema.createTable(schemaObject.getName(), schemaObject.getPriority()); 176 mergeTable.setView(sqlTable.isView()); 177 } else if (schemaObject.getPriority() > mergeTable.getPriority()) { 178 mergeTable.setPriority(schemaObject.getPriority()); 179 } 180 List<TSQLColumn> columnList = sqlTable.getColumnList(); 181 if (columnList != null) { 182 for (int i = 0; i < columnList.size(); i++) { 183 mergeTable.addColumn(columnList.get(i).getName()); 184 } 185 } 186 } else if (schemaObject instanceof TSQLRoutine) { 187 TSQLRoutine sqlRoutine = (TSQLRoutine) schemaObject; 188 TSQLSchemaObject mergeRoutine = schema.getSqlEnv().searchSchemaObject(schemaObject.getQualifiedName(), sqlRoutine.getDataObjectType()); 189 if (mergeRoutine == null) { 190 mergeRoutine = schema.createSchemaObject(schemaObject.getName(), schemaObject.getDataObjectType()); 191 } 192 if (sqlRoutine.getDefinition() != null) { 193 ((TSQLRoutine) mergeRoutine).setDefinition(sqlRoutine.getDefinition()); 194 } 195 } 196 } 197 198 public List<SqlInfo> getMetadataInfos() { 199 return metadataInfos; 200 } 201 202 public void setMetadataInfos(List<SqlInfo> metadataInfos) { 203 this.metadataInfos = metadataInfos; 204 } 205 206 @Override 207 public TSQLEnv[] parseSQLEnv(EDbVendor vendor, String sql) { 208 TDDLSQLEnv ddlSQLEnv = new TDDLSQLEnv(defaultServer, defaultDatabase, defaultSchema, metadataSQLEnv, vendor, sql); 209 ddlSQLEnv.initSQLEnv(); 210 if (ddlSQLEnv.isInit()) { 211 return new TSQLEnv[] { ddlSQLEnv }; 212 } else { 213 return null; 214 } 215 } 216}