001package gudusoft.gsqlparser.dlineage.dataflow.sqlenv;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo;
005import gudusoft.gsqlparser.dlineage.metadata.Coordinate;
006import gudusoft.gsqlparser.sqlenv.*;
007import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
008import gudusoft.gsqlparser.sqlenv.parser.TSQLEnvParser;
009import gudusoft.gsqlparser.util.Logger;
010import gudusoft.gsqlparser.util.LoggerFactory;
011import gudusoft.gsqlparser.util.SQLUtil;
012
013import java.io.File;
014import java.util.*;
015import java.util.concurrent.CopyOnWriteArrayList;
016import java.util.concurrent.CountDownLatch;
017import java.util.concurrent.Executors;
018import java.util.concurrent.ThreadPoolExecutor;
019
020public class SQLEnvParser implements TSQLEnvParser {
021
022        private static final Logger logger = LoggerFactory.getLogger(Coordinate.class);
023
024        private TSQLEnv metadataSQLEnv;
025        
026        private String defaultServer;
027        private String defaultDatabase;
028        private String defaultSchema;
029        private List<SqlInfo> metadataInfos = new CopyOnWriteArrayList<SqlInfo>();
030        
031        public SQLEnvParser(TSQLEnv metadataSQLEnv, String defaultServer, String defaultDatabase, String defaultSchema) {
032                this.metadataSQLEnv = metadataSQLEnv;
033                this.defaultServer = defaultServer;
034                this.defaultDatabase = defaultDatabase;
035                this.defaultSchema = defaultSchema;
036        }
037
038        public SQLEnvParser(String defaultServer, String defaultDatabase, String defaultSchema) {
039                this.metadataSQLEnv = null;
040                this.defaultServer = defaultServer;
041                this.defaultDatabase = defaultDatabase;
042                this.defaultSchema = defaultSchema;
043        }
044
045        public TSQLEnv[] parseSQLEnv(final EDbVendor vendor, SqlInfo[] sqlInfos) {
046                if (sqlInfos == null || sqlInfos.length == 0) {
047                        return null;
048                }
049                final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
050                final List<TSQLEnv> sqlenvs = new ArrayList<TSQLEnv>();
051                int thread = Runtime.getRuntime().availableProcessors() / 4 + 1;
052                if (thread < 4) thread = 4;
053                ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
054                                .newFixedThreadPool(thread < sqlInfos.length
055                                                ? thread
056                                                : sqlInfos.length);
057                for (int i = 0; i < sqlInfos.length; i++) {
058                        final SqlInfo sqlInfo = sqlInfos[i];
059                        Runnable task = new Runnable() {
060                                @Override
061                                public void run() {
062                                        try {
063                                                String sql = sqlInfo.getSql();
064                                                if (SQLUtil.isEmpty(sql) && sqlInfo.getFileName() != null
065                                                                && new File(sqlInfo.getFileName()).exists()) {
066                                                        sql = SQLUtil.getFileContent(sqlInfo.getFileName());
067                                                }
068                                                if (SQLUtil.isEmpty(sql) && sqlInfo.getFilePath() != null
069                                                                && new File(sqlInfo.getFilePath()).exists()) {
070                                                        sql = SQLUtil.getFileContent(sqlInfo.getFilePath());
071                                                }
072                                                TJSONSQLEnvParser jsonSQLEnvParser = new TJSONSQLEnvParser(defaultServer, defaultDatabase, defaultSchema);
073                                                TSQLEnv[] sqlenv = jsonSQLEnvParser.parseSQLEnv(vendor, sql);
074                                                if (sqlenv == null) {
075                                                        sqlenv = parseSQLEnv(vendor, sql);
076                                                }
077                                                else {
078                                                        metadataInfos.add(sqlInfo);
079                                                }
080                                                synchronized (sqlenvs) {
081                                                        if (sqlenv != null) {
082                                                                sqlenvs.addAll(Arrays.asList(sqlenv));
083                                                        }
084                                                }
085                                        } catch (Exception e) {
086                                                e.printStackTrace();
087                                        } finally {
088                                                latch.countDown();
089                                        }
090                                }
091                        };
092                        executor.submit(task);
093                }
094                try {
095                        latch.await();
096                } catch (Exception e) {
097                        logger.error("await latch occurs an exception.", e);
098                }
099                executor.shutdown();
100                Map<String, List<TSQLEnv>> sqlenvMap = new HashMap<String, List<TSQLEnv>>();
101                for (TSQLEnv sqlenv : sqlenvs) {
102                        String serverName = sqlenv.getDefaultServerName();
103                        if(serverName == null) {
104                                serverName= TSQLEnv.DEFAULT_SERVER_NAME;
105                        }
106                        if(!sqlenvMap.containsKey(serverName)) {
107                                sqlenvMap.put(serverName, new ArrayList<TSQLEnv>());
108                        }
109                        sqlenvMap.get(serverName).add(sqlenv);
110                }
111                
112                List<TSQLEnv> mergeSQLEnvs = new ArrayList<TSQLEnv>();
113                for(String key: sqlenvMap.keySet()) {
114                        List<TSQLEnv> sqlenvItem = sqlenvMap.get(key);
115                        mergeSQLEnvs.add(mergeSQLEnv(sqlenvItem));
116                }
117                return mergeSQLEnvs.toArray(new TSQLEnv[0]);
118        }
119
120        public static TSQLEnv mergeSQLEnv(List<TSQLEnv> sqlenvs) {
121                if (sqlenvs.size() == 0)
122                        return null;
123                TSQLEnv sqlenv = sqlenvs.get(0);
124                for (int i = 1; i < sqlenvs.size(); i++) {
125                        TSQLEnv temp = sqlenvs.get(i);
126                        if(SQLUtil.isEmpty(sqlenv.getDefaultServerName()) || sqlenv.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) {
127                                if(!SQLUtil.isEmpty(temp.getDefaultServerName()) && !temp.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) {
128                                        sqlenv.setDefaultServerName(temp.getDefaultServerName());
129                                }
130                        }
131                        if (temp.getCatalogList() != null) {
132                                for (int j = 0; j < temp.getCatalogList().size(); j++) {
133                                        mergeCatalog(sqlenv, temp.getCatalogList().get(j));
134                                }
135                        }
136                }
137//              if (sqlenv.getCatalogList() != null && sqlenv.getCatalogList().size() == 1) {
138//                      sqlenv.setDefaultCatalogName(sqlenv.getCatalogList().get(0).getName());
139//                      if (sqlenv.getCatalogList().get(0).getSchemaList() != null
140//                                      && sqlenv.getCatalogList().get(0).getSchemaList().size() == 1) {
141//                              sqlenv.setDefaultSchemaName(sqlenv.getCatalogList().get(0).getSchemaList().get(0).getName());
142//                      }
143//              }
144                return sqlenv;
145        }
146
147        private static void mergeCatalog(TSQLEnv sqlenv, TSQLCatalog catalog) {
148                TSQLCatalog mergeCatalog = sqlenv.getSQLCatalog(catalog.getName(), true);
149                List<TSQLSchema> schemaList = catalog.getSchemaList();
150                if (schemaList != null) {
151                        for (int i = 0; i < schemaList.size(); i++) {
152                                mergeSchema(mergeCatalog, schemaList.get(i));
153                        }
154                }
155        }
156
157        private static void mergeSchema(TSQLCatalog catalog, TSQLSchema schema) {
158                TSQLSchema mergeSchema = catalog.getSchema(schema.getName(), true);
159                List<TSQLSchemaObject> schemaObjectList = schema.getSchemaObjectList();
160                if (schemaObjectList != null) {
161                        for (int i = 0; i < schemaObjectList.size(); i++) {
162                                mergeSchemaObject(mergeSchema, schemaObjectList.get(i));
163                        }
164                }
165        }
166
167        private static void mergeSchemaObject(TSQLSchema schema, TSQLSchemaObject schemaObject) {
168                if (schemaObject instanceof TSQLTable) {
169                        TSQLTable sqlTable = (TSQLTable) schemaObject;
170                        TSQLTable mergeTable = schema.getSqlEnv().searchTable(schemaObject.getQualifiedName());
171                        if (mergeTable == null) {
172                                mergeTable = schema.createTable(schemaObject.getName(), schemaObject.getPriority());
173                                mergeTable.setView(sqlTable.isView());
174                        } else if (schemaObject.getPriority() > mergeTable.getPriority()) {
175                                mergeTable.setPriority(schemaObject.getPriority());
176                        }
177                        List<TSQLColumn> columnList = sqlTable.getColumnList();
178                        if (columnList != null) {
179                                for (int i = 0; i < columnList.size(); i++) {
180                                        mergeTable.addColumn(columnList.get(i).getName());
181                                }
182                        }
183                } else if (schemaObject instanceof TSQLRoutine) {
184                        TSQLRoutine sqlRoutine = (TSQLRoutine) schemaObject;
185                        TSQLSchemaObject mergeRoutine = schema.getSqlEnv().searchSchemaObject(schemaObject.getQualifiedName(), sqlRoutine.getDataObjectType());
186                        if (mergeRoutine == null) {
187                                mergeRoutine = schema.createSchemaObject(schemaObject.getName(), schemaObject.getDataObjectType());
188                        }
189                        if (sqlRoutine.getDefinition() != null) {
190                                ((TSQLRoutine) mergeRoutine).setDefinition(sqlRoutine.getDefinition());
191                        }
192                }
193        }
194
195        public List<SqlInfo> getMetadataInfos() {
196                return metadataInfos;
197        }
198
199        public void setMetadataInfos(List<SqlInfo> metadataInfos) {
200                this.metadataInfos = metadataInfos;
201        }
202
203        @Override
204        public TSQLEnv[] parseSQLEnv(EDbVendor vendor, String sql) {
205                TDDLSQLEnv ddlSQLEnv = new TDDLSQLEnv(defaultServer, defaultDatabase, defaultSchema, metadataSQLEnv, vendor, sql);
206                ddlSQLEnv.initSQLEnv();
207                if (ddlSQLEnv.isInit()) {
208                        return new TSQLEnv[] { ddlSQLEnv };
209                } else {
210                        return null;
211                }
212        }
213}