001package gudusoft.gsqlparser.dlineage.dataflow.sqlenv; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo; 005import gudusoft.gsqlparser.dlineage.metadata.Coordinate; 006import gudusoft.gsqlparser.sqlenv.*; 007import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 008import gudusoft.gsqlparser.sqlenv.parser.TSQLEnvParser; 009import gudusoft.gsqlparser.util.Logger; 010import gudusoft.gsqlparser.util.LoggerFactory; 011import gudusoft.gsqlparser.util.SQLUtil; 012 013import java.io.File; 014import java.util.*; 015import java.util.concurrent.CopyOnWriteArrayList; 016import java.util.concurrent.CountDownLatch; 017import java.util.concurrent.Executors; 018import java.util.concurrent.ThreadPoolExecutor; 019 020public class SQLEnvParser implements TSQLEnvParser { 021 022 private static final Logger logger = LoggerFactory.getLogger(Coordinate.class); 023 024 private TSQLEnv metadataSQLEnv; 025 026 private String defaultServer; 027 private String defaultDatabase; 028 private String defaultSchema; 029 private List<SqlInfo> metadataInfos = new CopyOnWriteArrayList<SqlInfo>(); 030 031 public SQLEnvParser(TSQLEnv metadataSQLEnv, String defaultServer, String defaultDatabase, String defaultSchema) { 032 this.metadataSQLEnv = metadataSQLEnv; 033 this.defaultServer = defaultServer; 034 this.defaultDatabase = defaultDatabase; 035 this.defaultSchema = defaultSchema; 036 } 037 038 public SQLEnvParser(String defaultServer, String defaultDatabase, String defaultSchema) { 039 this.metadataSQLEnv = null; 040 this.defaultServer = defaultServer; 041 this.defaultDatabase = defaultDatabase; 042 this.defaultSchema = defaultSchema; 043 } 044 045 public TSQLEnv[] parseSQLEnv(final EDbVendor vendor, SqlInfo[] sqlInfos) { 046 if (sqlInfos == null || sqlInfos.length == 0) { 047 return null; 048 } 049 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 050 final List<TSQLEnv> sqlenvs = new ArrayList<TSQLEnv>(); 051 int thread = Runtime.getRuntime().availableProcessors() / 4 + 1; 052 if (thread < 4) thread = 4; 053 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 054 .newFixedThreadPool(thread < sqlInfos.length 055 ? thread 056 : sqlInfos.length); 057 for (int i = 0; i < sqlInfos.length; i++) { 058 final SqlInfo sqlInfo = sqlInfos[i]; 059 Runnable task = new Runnable() { 060 @Override 061 public void run() { 062 try { 063 String sql = sqlInfo.getSql(); 064 if (SQLUtil.isEmpty(sql) && sqlInfo.getFileName() != null 065 && new File(sqlInfo.getFileName()).exists()) { 066 sql = SQLUtil.getFileContent(sqlInfo.getFileName()); 067 } 068 if (SQLUtil.isEmpty(sql) && sqlInfo.getFilePath() != null 069 && new File(sqlInfo.getFilePath()).exists()) { 070 sql = SQLUtil.getFileContent(sqlInfo.getFilePath()); 071 } 072 TJSONSQLEnvParser jsonSQLEnvParser = new TJSONSQLEnvParser(defaultServer, defaultDatabase, defaultSchema); 073 TSQLEnv[] sqlenv = jsonSQLEnvParser.parseSQLEnv(vendor, sql); 074 if (sqlenv == null) { 075 sqlenv = parseSQLEnv(vendor, sql); 076 } 077 else { 078 metadataInfos.add(sqlInfo); 079 } 080 synchronized (sqlenvs) { 081 if (sqlenv != null) { 082 sqlenvs.addAll(Arrays.asList(sqlenv)); 083 } 084 } 085 } catch (Exception e) { 086 e.printStackTrace(); 087 } finally { 088 latch.countDown(); 089 } 090 } 091 }; 092 executor.submit(task); 093 } 094 try { 095 latch.await(); 096 } catch (Exception e) { 097 logger.error("await latch occurs an exception.", e); 098 } 099 executor.shutdown(); 100 Map<String, List<TSQLEnv>> sqlenvMap = new HashMap<String, List<TSQLEnv>>(); 101 for (TSQLEnv sqlenv : sqlenvs) { 102 String serverName = sqlenv.getDefaultServerName(); 103 if(serverName == null) { 104 serverName= TSQLEnv.DEFAULT_SERVER_NAME; 105 } 106 if(!sqlenvMap.containsKey(serverName)) { 107 sqlenvMap.put(serverName, new ArrayList<TSQLEnv>()); 108 } 109 sqlenvMap.get(serverName).add(sqlenv); 110 } 111 112 List<TSQLEnv> mergeSQLEnvs = new ArrayList<TSQLEnv>(); 113 for(String key: sqlenvMap.keySet()) { 114 List<TSQLEnv> sqlenvItem = sqlenvMap.get(key); 115 mergeSQLEnvs.add(mergeSQLEnv(sqlenvItem)); 116 } 117 return mergeSQLEnvs.toArray(new TSQLEnv[0]); 118 } 119 120 public static TSQLEnv mergeSQLEnv(List<TSQLEnv> sqlenvs) { 121 if (sqlenvs.size() == 0) 122 return null; 123 TSQLEnv sqlenv = sqlenvs.get(0); 124 for (int i = 1; i < sqlenvs.size(); i++) { 125 TSQLEnv temp = sqlenvs.get(i); 126 if(SQLUtil.isEmpty(sqlenv.getDefaultServerName()) || sqlenv.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 127 if(!SQLUtil.isEmpty(temp.getDefaultServerName()) && !temp.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 128 sqlenv.setDefaultServerName(temp.getDefaultServerName()); 129 } 130 } 131 if (temp.getCatalogList() != null) { 132 for (int j = 0; j < temp.getCatalogList().size(); j++) { 133 mergeCatalog(sqlenv, temp.getCatalogList().get(j)); 134 } 135 } 136 } 137// if (sqlenv.getCatalogList() != null && sqlenv.getCatalogList().size() == 1) { 138// sqlenv.setDefaultCatalogName(sqlenv.getCatalogList().get(0).getName()); 139// if (sqlenv.getCatalogList().get(0).getSchemaList() != null 140// && sqlenv.getCatalogList().get(0).getSchemaList().size() == 1) { 141// sqlenv.setDefaultSchemaName(sqlenv.getCatalogList().get(0).getSchemaList().get(0).getName()); 142// } 143// } 144 return sqlenv; 145 } 146 147 private static void mergeCatalog(TSQLEnv sqlenv, TSQLCatalog catalog) { 148 TSQLCatalog mergeCatalog = sqlenv.getSQLCatalog(catalog.getName(), true); 149 List<TSQLSchema> schemaList = catalog.getSchemaList(); 150 if (schemaList != null) { 151 for (int i = 0; i < schemaList.size(); i++) { 152 mergeSchema(mergeCatalog, schemaList.get(i)); 153 } 154 } 155 } 156 157 private static void mergeSchema(TSQLCatalog catalog, TSQLSchema schema) { 158 TSQLSchema mergeSchema = catalog.getSchema(schema.getName(), true); 159 List<TSQLSchemaObject> schemaObjectList = schema.getSchemaObjectList(); 160 if (schemaObjectList != null) { 161 for (int i = 0; i < schemaObjectList.size(); i++) { 162 mergeSchemaObject(mergeSchema, schemaObjectList.get(i)); 163 } 164 } 165 } 166 167 private static void mergeSchemaObject(TSQLSchema schema, TSQLSchemaObject schemaObject) { 168 if (schemaObject instanceof TSQLTable) { 169 TSQLTable sqlTable = (TSQLTable) schemaObject; 170 TSQLTable mergeTable = schema.getSqlEnv().searchTable(schemaObject.getQualifiedName()); 171 if (mergeTable == null) { 172 mergeTable = schema.createTable(schemaObject.getName(), schemaObject.getPriority()); 173 mergeTable.setView(sqlTable.isView()); 174 } else if (schemaObject.getPriority() > mergeTable.getPriority()) { 175 mergeTable.setPriority(schemaObject.getPriority()); 176 } 177 List<TSQLColumn> columnList = sqlTable.getColumnList(); 178 if (columnList != null) { 179 for (int i = 0; i < columnList.size(); i++) { 180 mergeTable.addColumn(columnList.get(i).getName()); 181 } 182 } 183 } else if (schemaObject instanceof TSQLRoutine) { 184 TSQLRoutine sqlRoutine = (TSQLRoutine) schemaObject; 185 TSQLSchemaObject mergeRoutine = schema.getSqlEnv().searchSchemaObject(schemaObject.getQualifiedName(), sqlRoutine.getDataObjectType()); 186 if (mergeRoutine == null) { 187 mergeRoutine = schema.createSchemaObject(schemaObject.getName(), schemaObject.getDataObjectType()); 188 } 189 if (sqlRoutine.getDefinition() != null) { 190 ((TSQLRoutine) mergeRoutine).setDefinition(sqlRoutine.getDefinition()); 191 } 192 } 193 } 194 195 public List<SqlInfo> getMetadataInfos() { 196 return metadataInfos; 197 } 198 199 public void setMetadataInfos(List<SqlInfo> metadataInfos) { 200 this.metadataInfos = metadataInfos; 201 } 202 203 @Override 204 public TSQLEnv[] parseSQLEnv(EDbVendor vendor, String sql) { 205 TDDLSQLEnv ddlSQLEnv = new TDDLSQLEnv(defaultServer, defaultDatabase, defaultSchema, metadataSQLEnv, vendor, sql); 206 ddlSQLEnv.initSQLEnv(); 207 if (ddlSQLEnv.isInit()) { 208 return new TSQLEnv[] { ddlSQLEnv }; 209 } else { 210 return null; 211 } 212 } 213}