001package gudusoft.gsqlparser.dlineage.dataflow.sqlenv;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.dlineage.dataflow.model.SqlInfo;
005import gudusoft.gsqlparser.dlineage.metadata.Coordinate;
006import gudusoft.gsqlparser.sqlenv.*;
007import gudusoft.gsqlparser.sqlenv.parser.TJSONSQLEnvParser;
008import gudusoft.gsqlparser.sqlenv.parser.TSQLEnvParser;
009import gudusoft.gsqlparser.util.Logger;
010import gudusoft.gsqlparser.util.LoggerFactory;
011import gudusoft.gsqlparser.util.SQLUtil;
012
013import java.io.File;
014import java.util.*;
015import java.util.concurrent.CopyOnWriteArrayList;
016import java.util.concurrent.CountDownLatch;
017import java.util.concurrent.Executors;
018import java.util.concurrent.ThreadPoolExecutor;
019
020public class SQLEnvParser implements TSQLEnvParser {
021
022        private static final Logger logger = LoggerFactory.getLogger(Coordinate.class);
023
024        private TSQLEnv metadataSQLEnv;
025        
026        private String defaultServer;
027        private String defaultDatabase;
028        private String defaultSchema;
029        private List<SqlInfo> metadataInfos = new CopyOnWriteArrayList<SqlInfo>();
030        
031        public SQLEnvParser(TSQLEnv metadataSQLEnv, String defaultServer, String defaultDatabase, String defaultSchema) {
032                this.metadataSQLEnv = metadataSQLEnv;
033                this.defaultServer = defaultServer;
034                this.defaultDatabase = defaultDatabase;
035                this.defaultSchema = defaultSchema;
036        }
037
038        public SQLEnvParser(String defaultServer, String defaultDatabase, String defaultSchema) {
039                this.metadataSQLEnv = null;
040                this.defaultServer = defaultServer;
041                this.defaultDatabase = defaultDatabase;
042                this.defaultSchema = defaultSchema;
043        }
044
045        public TSQLEnv[] parseSQLEnv(final EDbVendor vendor, SqlInfo[] sqlInfos) {
046                if (sqlInfos == null || sqlInfos.length == 0) {
047                        return null;
048                }
049                final CountDownLatch latch = new CountDownLatch(sqlInfos.length);
050                final List<TSQLEnv> sqlenvs = new ArrayList<TSQLEnv>();
051                ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
052                                .newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 8 + 1 < sqlInfos.length
053                                                ? Runtime.getRuntime().availableProcessors() / 8 + 1
054                                                : sqlInfos.length);
055                for (int i = 0; i < sqlInfos.length; i++) {
056                        final SqlInfo sqlInfo = sqlInfos[i];
057                        Runnable task = new Runnable() {
058                                @Override
059                                public void run() {
060                                        try {
061                                                String sql = sqlInfo.getSql();
062                                                if (SQLUtil.isEmpty(sql) && sqlInfo.getFileName() != null
063                                                                && new File(sqlInfo.getFileName()).exists()) {
064                                                        sql = SQLUtil.getFileContent(sqlInfo.getFileName());
065                                                }
066                                                TJSONSQLEnvParser jsonSQLEnvParser = new TJSONSQLEnvParser(defaultServer, defaultDatabase, defaultSchema);
067                                                TSQLEnv[] sqlenv = jsonSQLEnvParser.parseSQLEnv(vendor, sql);
068                                                if (sqlenv == null) {
069                                                        sqlenv = parseSQLEnv(vendor, sql);
070                                                }
071                                                else {
072                                                        metadataInfos.add(sqlInfo);
073                                                }
074                                                synchronized (sqlenvs) {
075                                                        if (sqlenv != null) {
076                                                                sqlenvs.addAll(Arrays.asList(sqlenv));
077                                                        }
078                                                }
079                                        } finally {
080                                                latch.countDown();
081                                        }
082                                }
083                        };
084                        executor.submit(task);
085                }
086                try {
087                        latch.await();
088                } catch (Exception e) {
089                        logger.error("await latch occurs an exception.", e);
090                }
091                executor.shutdown();
092                Map<String, List<TSQLEnv>> sqlenvMap = new HashMap<String, List<TSQLEnv>>();
093                for (TSQLEnv sqlenv : sqlenvs) {
094                        String serverName = sqlenv.getDefaultServerName();
095                        if(serverName == null) {
096                                serverName= TSQLEnv.DEFAULT_SERVER_NAME;
097                        }
098                        if(!sqlenvMap.containsKey(serverName)) {
099                                sqlenvMap.put(serverName, new ArrayList<TSQLEnv>());
100                        }
101                        sqlenvMap.get(serverName).add(sqlenv);
102                }
103                
104                List<TSQLEnv> mergeSQLEnvs = new ArrayList<TSQLEnv>();
105                for(String key: sqlenvMap.keySet()) {
106                        List<TSQLEnv> sqlenvItem = sqlenvMap.get(key);
107                        mergeSQLEnvs.add(mergeSQLEnv(sqlenvItem));
108                }
109                return mergeSQLEnvs.toArray(new TSQLEnv[0]);
110        }
111
112        public static TSQLEnv mergeSQLEnv(List<TSQLEnv> sqlenvs) {
113                if (sqlenvs.size() == 0)
114                        return null;
115                TSQLEnv sqlenv = sqlenvs.get(0);
116                for (int i = 1; i < sqlenvs.size(); i++) {
117                        TSQLEnv temp = sqlenvs.get(i);
118                        if(SQLUtil.isEmpty(sqlenv.getDefaultServerName()) || sqlenv.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) {
119                                if(!SQLUtil.isEmpty(temp.getDefaultServerName()) && !temp.getDefaultServerName().equals(TSQLEnv.DEFAULT_SERVER_NAME)) {
120                                        sqlenv.setDefaultServerName(temp.getDefaultServerName());
121                                }
122                        }
123                        if (temp.getCatalogList() != null) {
124                                for (int j = 0; j < temp.getCatalogList().size(); j++) {
125                                        mergeCatalog(sqlenv, temp.getCatalogList().get(j));
126                                }
127                        }
128                }
129//              if (sqlenv.getCatalogList() != null && sqlenv.getCatalogList().size() == 1) {
130//                      sqlenv.setDefaultCatalogName(sqlenv.getCatalogList().get(0).getName());
131//                      if (sqlenv.getCatalogList().get(0).getSchemaList() != null
132//                                      && sqlenv.getCatalogList().get(0).getSchemaList().size() == 1) {
133//                              sqlenv.setDefaultSchemaName(sqlenv.getCatalogList().get(0).getSchemaList().get(0).getName());
134//                      }
135//              }
136                return sqlenv;
137        }
138
139        private static void mergeCatalog(TSQLEnv sqlenv, TSQLCatalog catalog) {
140                TSQLCatalog mergeCatalog = sqlenv.getSQLCatalog(catalog.getName(), true);
141                List<TSQLSchema> schemaList = catalog.getSchemaList();
142                if (schemaList != null) {
143                        for (int i = 0; i < schemaList.size(); i++) {
144                                mergeSchema(mergeCatalog, schemaList.get(i));
145                        }
146                }
147        }
148
149        private static void mergeSchema(TSQLCatalog catalog, TSQLSchema schema) {
150                TSQLSchema mergeSchema = catalog.getSchema(schema.getName(), true);
151                List<TSQLSchemaObject> schemaObjectList = schema.getSchemaObjectList();
152                if (schemaObjectList != null) {
153                        for (int i = 0; i < schemaObjectList.size(); i++) {
154                                mergeSchemaObject(mergeSchema, schemaObjectList.get(i));
155                        }
156                }
157        }
158
159        private static void mergeSchemaObject(TSQLSchema schema, TSQLSchemaObject schemaObject) {
160                if (schemaObject instanceof TSQLTable) {
161                        TSQLTable sqlTable = (TSQLTable) schemaObject;
162                        TSQLTable mergeTable = schema.getSqlEnv().searchTable(schemaObject.getQualifiedName());
163                        if (mergeTable == null) {
164                                mergeTable = schema.createTable(schemaObject.getName(), schemaObject.getPriority());
165                                mergeTable.setView(sqlTable.isView());
166                        } else if (schemaObject.getPriority() > mergeTable.getPriority()) {
167                                mergeTable.setPriority(schemaObject.getPriority());
168                        }
169                        List<TSQLColumn> columnList = sqlTable.getColumnList();
170                        if (columnList != null) {
171                                for (int i = 0; i < columnList.size(); i++) {
172                                        mergeTable.addColumn(columnList.get(i).getName());
173                                }
174                        }
175                } else if (schemaObject instanceof TSQLRoutine) {
176                        TSQLRoutine sqlRoutine = (TSQLRoutine) schemaObject;
177                        TSQLSchemaObject mergeRoutine = schema.getSqlEnv().searchSchemaObject(schemaObject.getQualifiedName(), sqlRoutine.getDataObjectType());
178                        if (mergeRoutine == null) {
179                                mergeRoutine = schema.createSchemaObject(schemaObject.getName(), schemaObject.getDataObjectType());
180                        }
181                        if (sqlRoutine.getDefinition() != null) {
182                                ((TSQLRoutine) mergeRoutine).setDefinition(sqlRoutine.getDefinition());
183                        }
184                }
185        }
186
187        public List<SqlInfo> getMetadataInfos() {
188                return metadataInfos;
189        }
190
191        public void setMetadataInfos(List<SqlInfo> metadataInfos) {
192                this.metadataInfos = metadataInfos;
193        }
194
195        @Override
196        public TSQLEnv[] parseSQLEnv(EDbVendor vendor, String sql) {
197                TDDLSQLEnv ddlSQLEnv = new TDDLSQLEnv(defaultServer, defaultDatabase, defaultSchema, metadataSQLEnv, vendor, sql);
198                ddlSQLEnv.initSQLEnv();
199                if (ddlSQLEnv.isInit()) {
200                        return new TSQLEnv[] { ddlSQLEnv };
201                } else {
202                        return null;
203                }
204        }
205}