001package gudusoft.gsqlparser.dlineage.dataflow.sqlenv; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo; 005import gudusoft.gsqlparser.dlineage.metadata.Coordinate; 006import gudusoft.gsqlparser.sqlenv.*; 007import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser; 008import gudusoft.gsqlparser.sqlenv.parser.TSQLEnvParser; 009import gudusoft.gsqlparser.util.Logger; 010import gudusoft.gsqlparser.util.LoggerFactory; 011import gudusoft.gsqlparser.util.SQLUtil; 012 013import java.io.File; 014import java.util.*; 015import java.util.concurrent.CopyOnWriteArrayList; 016import java.util.concurrent.CountDownLatch; 017import java.util.concurrent.Executors; 018import java.util.concurrent.ThreadPoolExecutor; 019 020public class SQLEnvParser implements TSQLEnvParser { 021 022 private static final Logger logger = LoggerFactory.getLogger(Coordinate.class); 023 024 private TSQLEnv metadataSQLEnv; 025 026 private String defaultServer; 027 private String defaultDatabase; 028 private String defaultSchema; 029 private List<SqlInfo> metadataInfos = new CopyOnWriteArrayList<SqlInfo>(); 030 031 public SQLEnvParser(TSQLEnv metadataSQLEnv, String defaultServer, String defaultDatabase, String defaultSchema) { 032 this.metadataSQLEnv = metadataSQLEnv; 033 this.defaultServer = defaultServer; 034 this.defaultDatabase = defaultDatabase; 035 this.defaultSchema = defaultSchema; 036 } 037 038 public SQLEnvParser(String defaultServer, String defaultDatabase, String defaultSchema) { 039 this.metadataSQLEnv = null; 040 this.defaultServer = defaultServer; 041 this.defaultDatabase = defaultDatabase; 042 this.defaultSchema = defaultSchema; 043 } 044 045 public TSQLEnv[] parseSQLEnv(final EDbVendor vendor, SqlInfo[] sqlInfos) { 046 if (sqlInfos == null || sqlInfos.length == 0) { 047 return null; 048 } 049 final CountDownLatch latch = new CountDownLatch(sqlInfos.length); 050 final List<TSQLEnv> sqlenvs = new ArrayList<TSQLEnv>(); 051 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 052 .newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 8 + 1 < sqlInfos.length 053 ? Runtime.getRuntime().availableProcessors() / 8 + 1 054 : sqlInfos.length); 055 for (int i = 0; i < sqlInfos.length; i++) { 056 final SqlInfo sqlInfo = sqlInfos[i]; 057 Runnable task = new Runnable() { 058 @Override 059 public void run() { 060 try { 061 String sql = sqlInfo.getSql(); 062 if (SQLUtil.isEmpty(sql) && sqlInfo.getFileName() != null 063 && new File(sqlInfo.getFileName()).exists()) { 064 sql = SQLUtil.getFileContent(sqlInfo.getFileName()); 065 } 066 TJSONSQLEnvParser jsonSQLEnvParser = new TJSONSQLEnvParser(defaultServer, defaultDatabase, defaultSchema); 067 TSQLEnv[] sqlenv = jsonSQLEnvParser.parseSQLEnv(vendor, sql); 068 if (sqlenv == null) { 069 sqlenv = parseSQLEnv(vendor, sql); 070 } 071 else { 072 metadataInfos.add(sqlInfo); 073 } 074 synchronized (sqlenvs) { 075 if (sqlenv != null) { 076 sqlenvs.addAll(Arrays.asList(sqlenv)); 077 } 078 } 079 } finally { 080 latch.countDown(); 081 } 082 } 083 }; 084 executor.submit(task); 085 } 086 try { 087 latch.await(); 088 } catch (Exception e) { 089 logger.error("await latch occurs an exception.", e); 090 } 091 executor.shutdown(); 092 Map<String, List<TSQLEnv>> sqlenvMap = new HashMap<String, List<TSQLEnv>>(); 093 for (TSQLEnv sqlenv : sqlenvs) { 094 String serverName = sqlenv.getDefaultServerName(); 095 if(serverName == null) { 096 serverName= TSQLEnv.DEFAULT_SERVER_NAME; 097 } 098 if(!sqlenvMap.containsKey(serverName)) { 099 sqlenvMap.put(serverName, new ArrayList<TSQLEnv>()); 100 } 101 sqlenvMap.get(serverName).add(sqlenv); 102 } 103 104 List<TSQLEnv> mergeSQLEnvs = new ArrayList<TSQLEnv>(); 105 for(String key: sqlenvMap.keySet()) { 106 List<TSQLEnv> sqlenvItem = sqlenvMap.get(key); 107 mergeSQLEnvs.add(mergeSQLEnv(sqlenvItem)); 108 } 109 return mergeSQLEnvs.toArray(new TSQLEnv[0]); 110 } 111 112 public static TSQLEnv mergeSQLEnv(List<TSQLEnv> sqlenvs) { 113 if (sqlenvs.size() == 0) 114 return null; 115 TSQLEnv sqlenv = sqlenvs.get(0); 116 for (int i = 1; i < sqlenvs.size(); i++) { 117 TSQLEnv temp = sqlenvs.get(i); 118 if(SQLUtil.isEmpty(sqlenv.getDefaultServerName()) || sqlenv.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 119 if(!SQLUtil.isEmpty(temp.getDefaultServerName()) && !temp.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) { 120 sqlenv.setDefaultServerName(temp.getDefaultServerName()); 121 } 122 } 123 if (temp.getCatalogList() != null) { 124 for (int j = 0; j < temp.getCatalogList().size(); j++) { 125 mergeCatalog(sqlenv, temp.getCatalogList().get(j)); 126 } 127 } 128 } 129// if (sqlenv.getCatalogList() != null && sqlenv.getCatalogList().size() == 1) { 130// sqlenv.setDefaultCatalogName(sqlenv.getCatalogList().get(0).getName()); 131// if (sqlenv.getCatalogList().get(0).getSchemaList() != null 132// && sqlenv.getCatalogList().get(0).getSchemaList().size() == 1) { 133// sqlenv.setDefaultSchemaName(sqlenv.getCatalogList().get(0).getSchemaList().get(0).getName()); 134// } 135// } 136 return sqlenv; 137 } 138 139 private static void mergeCatalog(TSQLEnv sqlenv, TSQLCatalog catalog) { 140 TSQLCatalog mergeCatalog = sqlenv.getSQLCatalog(catalog.getName(), true); 141 List<TSQLSchema> schemaList = catalog.getSchemaList(); 142 if (schemaList != null) { 143 for (int i = 0; i < schemaList.size(); i++) { 144 mergeSchema(mergeCatalog, schemaList.get(i)); 145 } 146 } 147 } 148 149 private static void mergeSchema(TSQLCatalog catalog, TSQLSchema schema) { 150 TSQLSchema mergeSchema = catalog.getSchema(schema.getName(), true); 151 List<TSQLSchemaObject> schemaObjectList = schema.getSchemaObjectList(); 152 if (schemaObjectList != null) { 153 for (int i = 0; i < schemaObjectList.size(); i++) { 154 mergeSchemaObject(mergeSchema, schemaObjectList.get(i)); 155 } 156 } 157 } 158 159 private static void mergeSchemaObject(TSQLSchema schema, TSQLSchemaObject schemaObject) { 160 if (schemaObject instanceof TSQLTable) { 161 TSQLTable sqlTable = (TSQLTable) schemaObject; 162 TSQLTable mergeTable = schema.getSqlEnv().searchTable(schemaObject.getQualifiedName()); 163 if (mergeTable == null) { 164 mergeTable = schema.createTable(schemaObject.getName(), schemaObject.getPriority()); 165 mergeTable.setView(sqlTable.isView()); 166 } else if (schemaObject.getPriority() > mergeTable.getPriority()) { 167 mergeTable.setPriority(schemaObject.getPriority()); 168 } 169 List<TSQLColumn> columnList = sqlTable.getColumnList(); 170 if (columnList != null) { 171 for (int i = 0; i < columnList.size(); i++) { 172 mergeTable.addColumn(columnList.get(i).getName()); 173 } 174 } 175 } else if (schemaObject instanceof TSQLRoutine) { 176 TSQLRoutine sqlRoutine = (TSQLRoutine) schemaObject; 177 TSQLSchemaObject mergeRoutine = schema.getSqlEnv().searchSchemaObject(schemaObject.getQualifiedName(), sqlRoutine.getDataObjectType()); 178 if (mergeRoutine == null) { 179 mergeRoutine = schema.createSchemaObject(schemaObject.getName(), schemaObject.getDataObjectType()); 180 } 181 if (sqlRoutine.getDefinition() != null) { 182 ((TSQLRoutine) mergeRoutine).setDefinition(sqlRoutine.getDefinition()); 183 } 184 } 185 } 186 187 public List<SqlInfo> getMetadataInfos() { 188 return metadataInfos; 189 } 190 191 public void setMetadataInfos(List<SqlInfo> metadataInfos) { 192 this.metadataInfos = metadataInfos; 193 } 194 195 @Override 196 public TSQLEnv[] parseSQLEnv(EDbVendor vendor, String sql) { 197 TDDLSQLEnv ddlSQLEnv = new TDDLSQLEnv(defaultServer, defaultDatabase, defaultSchema, metadataSQLEnv, vendor, sql); 198 ddlSQLEnv.initSQLEnv(); 199 if (ddlSQLEnv.isInit()) { 200 return new TSQLEnv[] { ddlSQLEnv }; 201 } else { 202 return null; 203 } 204 } 205}