001package gudusoft.gsqlparser.parser;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.TBaseType;
005import gudusoft.gsqlparser.TCustomLexer;
006import gudusoft.gsqlparser.TCustomParser;
007import gudusoft.gsqlparser.TCustomSqlStatement;
008import gudusoft.gsqlparser.TLexerFlink;
009import gudusoft.gsqlparser.TParserFlink;
010import gudusoft.gsqlparser.TSourceToken;
011import gudusoft.gsqlparser.TSourceTokenList;
012import gudusoft.gsqlparser.TStatementList;
013import gudusoft.gsqlparser.TSyntaxError;
014import gudusoft.gsqlparser.EFindSqlStateType;
015import gudusoft.gsqlparser.ETokenType;
016import gudusoft.gsqlparser.ETokenStatus;
017import gudusoft.gsqlparser.ESqlStatementType;
018import gudusoft.gsqlparser.EErrorType;
019import gudusoft.gsqlparser.stmt.TUnknownSqlStatement;
020import gudusoft.gsqlparser.stmt.mysql.TMySQLSource;
021import gudusoft.gsqlparser.sqlcmds.ISqlCmds;
022import gudusoft.gsqlparser.sqlcmds.SqlCmdsFactory;
023import gudusoft.gsqlparser.compiler.TContext;
024import gudusoft.gsqlparser.sqlenv.TSQLEnv;
025import gudusoft.gsqlparser.compiler.TGlobalScope;
026import gudusoft.gsqlparser.compiler.TFrame;
027import gudusoft.gsqlparser.resolver.TSQLResolver;
028import gudusoft.gsqlparser.TLog;
029import gudusoft.gsqlparser.compiler.TASTEvaluator;
030
031import java.io.BufferedReader;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Stack;
035
036/**
037 * Apache Flink SQL parser implementation.
038 *
039 * <p>This parser handles Flink SQL-specific syntax including:
040 * <ul>
041 *   <li>Flink SQL DML/DDL operations</li>
042 *   <li>Special token handling for DATE, TIME, TIMESTAMP, INTERVAL</li>
043 *   <li>Stored procedures, functions, and triggers</li>
044 *   <li>Flink-specific statements: CREATE CATALOG, CREATE MODEL, etc.</li>
045 * </ul>
046 *
047 * <p><b>Implementation Status:</b> NEW
048 * <ul>
049 *   <li><b>Base:</b> SparkSQL (Apache Calcite foundation)</li>
050 *   <li><b>Tokenization:</b> doflinktexttotokenlist()</li>
051 *   <li><b>Raw Extraction:</b> doflinkgetrawsqlstatements()</li>
052 *   <li><b>Parsing:</b> Fully self-contained using TParserFlink</li>
053 * </ul>
054 *
055 * @see SqlParser
056 * @see AbstractSqlParser
057 * @see TLexerFlink
058 * @see TParserFlink
059 * @since 3.2.0.0
060 */
061public class FlinkSqlParser extends AbstractSqlParser {
062
063    // Vendor-specific parser and lexer
064    private TLexerFlink flexer;
065    private TParserFlink fparser;
066
067    // State management for raw statement extraction
068    private TCustomSqlStatement gcurrentsqlstatement;
069    private String userDelimiterStr;
070    private char curdelimiterchar;
071
072    /**
073     * Construct Flink SQL parser.
074     * <p>
075     * Configures the parser for Flink SQL with default delimiter (;).
076     */
077    public FlinkSqlParser() {
078        super(EDbVendor.dbvflink);
079        this.delimiterChar = ';';  // Flink SQL delimiter
080        this.defaultDelimiterStr = ";";  // Default delimiter
081
082        // Create lexer once - will be reused for all parsing operations
083        this.flexer = new TLexerFlink();
084        this.flexer.delimiterchar = this.delimiterChar;
085        this.flexer.defaultDelimiterStr = this.defaultDelimiterStr;
086
087        // Set parent's lexer reference for shared tokenization logic
088        this.lexer = this.flexer;
089
090        // Create parser once - will be reused for all parsing operations
091        this.fparser = new TParserFlink(null);
092        this.fparser.lexer = this.flexer;
093    }
094
095    @Override
096    public EDbVendor getVendor() {
097        return vendor;
098    }
099
100    // ========== Abstract Method Implementations ==========
101
102    @Override
103    protected TCustomLexer getLexer(ParserContext context) {
104        return this.flexer;
105    }
106
107    @Override
108    protected TCustomParser getParser(ParserContext context, TSourceTokenList tokens) {
109        return this.fparser;
110    }
111
112    @Override
113    protected TCustomParser getSecondaryParser(ParserContext context, TSourceTokenList tokens) {
114        // Flink SQL doesn't have a secondary parser
115        return null;
116    }
117
118    // ========== Tokenization Phase (Hook Pattern) ==========
119
120    /**
121     * Hook method for vendor-specific tokenization.
122     * <p>
123     * Delegates to doflinktexttotokenlist() which implements Flink SQL-specific
124     * token processing logic.
125     */
126    @Override
127    protected void tokenizeVendorSql() {
128        doflinktexttotokenlist();
129    }
130
131    /**
132     * Flink SQL-specific tokenization logic.
133     * <p>
134     * Special handling:
135     * <ul>
136     *   <li>MySQL-style comment validation</li>
137     *   <li>WITH ROLLUP token adjustment</li>
138     *   <li>Delimiter detection</li>
139     * </ul>
140     */
141    private void doflinktexttotokenlist() {
142        TSourceToken asourcetoken, lcprevst;
143        int yychar;
144        boolean startDelimiter = false;
145
146        flexer.tmpDelimiter = "";
147
148        asourcetoken = getanewsourcetoken();
149        if (asourcetoken == null) return;
150        yychar = asourcetoken.tokencode;
151
152        while (yychar > 0) {
153            sourcetokenlist.add(asourcetoken);
154            asourcetoken = getanewsourcetoken();
155            if (asourcetoken == null) break;
156            checkFlinkCommentToken(asourcetoken);
157
158            if ((asourcetoken.tokencode == TBaseType.lexnewline) && (startDelimiter)) {
159                startDelimiter = false;
160                flexer.tmpDelimiter = sourcetokenlist.get(sourcetokenlist.size() - 1).getAstext();
161            }
162
163            if (asourcetoken.tokencode == TBaseType.rrw_rollup) {
164                // with rollup
165                lcprevst = getprevsolidtoken(asourcetoken);
166                if (lcprevst != null) {
167                    if (lcprevst.tokencode == TBaseType.rrw_with)
168                        lcprevst.tokencode = TBaseType.with_rollup;
169                }
170            }
171
172            yychar = asourcetoken.tokencode;
173        }
174    }
175
176    /**
177     * Helper method for Flink-style comment validation.
178     */
179    private void checkFlinkCommentToken(TSourceToken cmtToken) {
180        // No-op: similar to SparkSQL
181    }
182
183    /**
184     * Helper method to get previous solid token (non-whitespace, non-comment).
185     */
186    private TSourceToken getprevsolidtoken(TSourceToken ptoken) {
187        TSourceToken lcprevtoken = null;
188        int i = ptoken.posinlist;
189        while (i > 0) {
190            i--;
191            lcprevtoken = sourcetokenlist.get(i);
192            if ((lcprevtoken.tokencode == TBaseType.lexspace)
193                    || (lcprevtoken.tokencode == TBaseType.lexnewline)
194                    || (lcprevtoken.tokencode == TBaseType.cmtdoublehyphen)
195                    || (lcprevtoken.tokencode == TBaseType.cmtslashstar)) {
196                continue;
197            }
198            return lcprevtoken;
199        }
200        return null;
201    }
202
203    /**
204     * Helper method to add token to statement.
205     */
206    private void appendToken(TCustomSqlStatement statement, TSourceToken token) {
207        if (statement == null || token == null) {
208            return;
209        }
210        token.stmt = statement;
211        statement.sourcetokenlist.add(token);
212    }
213
214    // ========== Raw Statement Extraction Phase (Hook Pattern) ==========
215
216    /**
217     * Hook method to setup parsers before raw statement extraction.
218     */
219    @Override
220    protected void setupVendorParsersForExtraction() {
221        this.fparser.sqlcmds = this.sqlcmds;
222        this.fparser.sourcetokenlist = this.sourcetokenlist;
223    }
224
225    /**
226     * Hook method for vendor-specific raw statement extraction.
227     */
228    @Override
229    protected void extractVendorRawStatements(SqlParseResult.Builder builder) {
230        doflinkgetrawsqlstatements(builder);
231    }
232
233    /**
234     * Flink SQL-specific raw statement extraction logic.
235     * <p>
236     * This method:
237     * <ul>
238     *   <li>Adjusts DATE, TIME, TIMESTAMP, INTERVAL token codes based on context</li>
239     *   <li>Handles statement boundaries (semicolon, custom delimiters)</li>
240     *   <li>Supports stored procedures with BEGIN/END blocks</li>
241     * </ul>
242     *
243     * @param builder the result builder to collect errors
244     * @return error count (currently always 0)
245     */
246    private int doflinkgetrawsqlstatements(SqlParseResult.Builder builder) {
247        int errorcount = 0;
248        gcurrentsqlstatement = null;
249        EFindSqlStateType gst = EFindSqlStateType.stnormal;
250        int i;
251        TSourceToken ast;
252        boolean waitingDelimiter = false;
253
254        // Reset delimiter
255        userDelimiterStr = defaultDelimiterStr;
256
257        for (i = 0; i < sourcetokenlist.size(); i++) {
258            ast = sourcetokenlist.get(i);
259            sourcetokenlist.curpos = i;
260
261            // Flink SQL-specific token adjustments (similar to SparkSQL)
262            if (ast.tokencode == TBaseType.rrw_date) {
263                TSourceToken st1 = ast.nextSolidToken();
264                if (st1 != null) {
265                    if (st1.tokencode == '(') {
266                        ast.tokencode = TBaseType.rrw_spark_date_function;
267                    } else if (st1.tokencode == TBaseType.sconst) {
268                        ast.tokencode = TBaseType.rrw_spark_date_const;
269                    }
270                }
271            } else if (ast.tokencode == TBaseType.rrw_time) {
272                TSourceToken st1 = ast.nextSolidToken();
273                if (st1 != null) {
274                    if (st1.tokencode == TBaseType.sconst) {
275                        ast.tokencode = TBaseType.rrw_spark_time_const;
276                    }
277                }
278            } else if (ast.tokencode == TBaseType.rrw_timestamp) {
279                TSourceToken st1 = ast.nextSolidToken();
280                if (st1 != null) {
281                    if (st1.tokencode == TBaseType.sconst) {
282                        ast.tokencode = TBaseType.rrw_spark_timestamp_constant;
283                    } else if (st1.tokencode == TBaseType.ident) {
284                        if (st1.toString().startsWith("\"")) {
285                            ast.tokencode = TBaseType.rrw_spark_timestamp_constant;
286                            st1.tokencode = TBaseType.sconst;
287                        }
288                    }
289                }
290            } else if (ast.tokencode == TBaseType.rrw_interval) {
291                TSourceToken leftParen = ast.searchToken('(', 1);
292                if (leftParen != null) {
293                    int k = leftParen.posinlist + 1;
294                    boolean commaToken = false;
295                    while (k < ast.container.size()) {
296                        if (ast.container.get(k).tokencode == ')') break;
297                        if (ast.container.get(k).tokencode == ',') {
298                            commaToken = true;
299                            break;
300                        }
301                        k++;
302                    }
303                    if (commaToken) {
304                        ast.tokencode = TBaseType.rrw_mysql_interval_func;
305                    }
306                }
307            } else if (ast.tokencode == TBaseType.rrw_spark_position) {
308                TSourceToken leftParen = ast.searchToken('(', 1);
309                if (leftParen != null) {
310                    // POSITION is a function
311                } else {
312                    ast.tokencode = TBaseType.ident; // treat it as identifier
313                }
314            }
315            // Handle LOCALTIME: distinguish function from identifier based on context
316            // Function context: SELECT LOCALTIME, WHERE LOCALTIME > x
317            // Identifier context: SELECT t.localtime (after period - field reference)
318            // Note: Use flink_rw_localtime (707) which matches the Flink lexer's token code
319            else if (ast.tokencode == TBaseType.flink_rw_localtime) {
320                TSourceToken prevToken = ast.prevSolidToken();
321                // If preceded by period, it's a field reference (identifier)
322                // Otherwise, it's a builtin function
323                if (prevToken == null || prevToken.tokencode != '.') {
324                    ast.tokencode = TBaseType.rrw_flink_localtime_as_func;
325                }
326            }
327            // Handle LOCALTIMESTAMP: same logic as LOCALTIME
328            // Note: Use flink_rw_localtimestamp (708) which matches the Flink lexer's token code
329            else if (ast.tokencode == TBaseType.flink_rw_localtimestamp) {
330                TSourceToken prevToken = ast.prevSolidToken();
331                if (prevToken == null || prevToken.tokencode != '.') {
332                    ast.tokencode = TBaseType.rrw_flink_localtimestamp_as_func;
333                }
334            }
335
336            switch (gst) {
337                case sterror: {
338                    if (ast.tokentype == ETokenType.ttsemicolon) {
339                        appendToken(gcurrentsqlstatement, ast);
340                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
341                        gst = EFindSqlStateType.stnormal;
342                    } else {
343                        appendToken(gcurrentsqlstatement, ast);
344                    }
345                    break;
346                }
347                case stnormal: {
348                    if ((ast.tokencode == TBaseType.cmtdoublehyphen)
349                            || (ast.tokencode == TBaseType.cmtslashstar)
350                            || (ast.tokencode == TBaseType.lexspace)
351                            || (ast.tokencode == TBaseType.lexnewline)
352                            || (ast.tokentype == ETokenType.ttsemicolon)) {
353                        if (TBaseType.assigned(gcurrentsqlstatement)) {
354                            appendToken(gcurrentsqlstatement, ast);
355                        }
356                        continue;
357                    }
358
359                    if ((ast.isFirstTokenOfLine()) && ((ast.tokencode == TBaseType.rrw_mysql_source) || (ast.tokencode == TBaseType.slash_dot))) {
360                        gst = EFindSqlStateType.stsqlplus;
361                        gcurrentsqlstatement = new TMySQLSource(vendor);
362                        appendToken(gcurrentsqlstatement, ast);
363                        continue;
364                    }
365
366                    // Find a token to start sql or plsql mode
367                    gcurrentsqlstatement = sqlcmds.issql(ast, gst, gcurrentsqlstatement);
368
369                    if (TBaseType.assigned(gcurrentsqlstatement)) {
370                        ESqlStatementType[] ses = {ESqlStatementType.sstmysqlcreateprocedure, ESqlStatementType.sstmysqlcreatefunction,
371                                ESqlStatementType.sstcreateprocedure, ESqlStatementType.sstcreatefunction,
372                                ESqlStatementType.sstcreatetrigger};
373                        if (includesqlstatementtype(gcurrentsqlstatement.sqlstatementtype, ses)) {
374                            gst = EFindSqlStateType.ststoredprocedure;
375                            waitingDelimiter = false;
376                            appendToken(gcurrentsqlstatement, ast);
377                            curdelimiterchar = ';';
378                        } else {
379                            gst = EFindSqlStateType.stsql;
380                            appendToken(gcurrentsqlstatement, ast);
381                        }
382                    }
383
384                    if (!TBaseType.assigned(gcurrentsqlstatement)) {
385                        // Error token found
386                        this.syntaxErrors.add(new TSyntaxError(ast.getAstext(), ast.lineNo, (ast.columnNo < 0 ? 0 : ast.columnNo),
387                                "Error when tokenlize", EErrorType.spwarning, TBaseType.MSG_WARNING_ERROR_WHEN_TOKENIZE, null, ast.posinlist));
388
389                        ast.tokentype = ETokenType.tttokenlizererrortoken;
390                        gst = EFindSqlStateType.sterror;
391
392                        gcurrentsqlstatement = new TUnknownSqlStatement(vendor);
393                        gcurrentsqlstatement.sqlstatementtype = ESqlStatementType.sstinvalid;
394                        appendToken(gcurrentsqlstatement, ast);
395                    }
396                    break;
397                }
398                case stsqlplus: {
399                    if (ast.tokencode == TBaseType.lexnewline) {
400                        gst = EFindSqlStateType.stnormal;
401                        appendToken(gcurrentsqlstatement, ast);
402                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
403                    } else {
404                        appendToken(gcurrentsqlstatement, ast);
405                    }
406                    break;
407                }
408                case stsql: {
409                    if ((ast.tokentype == ETokenType.ttsemicolon) && (gcurrentsqlstatement.sqlstatementtype != ESqlStatementType.sstmysqldelimiter)) {
410                        gst = EFindSqlStateType.stnormal;
411                        appendToken(gcurrentsqlstatement, ast);
412                        gcurrentsqlstatement.semicolonended = ast;
413                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
414                        continue;
415                    }
416                    if (ast.toString().equalsIgnoreCase(userDelimiterStr)) {
417                        gst = EFindSqlStateType.stnormal;
418                        ast.tokencode = ';'; // treat it as semicolon
419                        appendToken(gcurrentsqlstatement, ast);
420                        gcurrentsqlstatement.semicolonended = ast;
421                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
422                        continue;
423                    }
424                    appendToken(gcurrentsqlstatement, ast);
425
426                    if ((ast.tokencode == TBaseType.lexnewline)
427                            && (gcurrentsqlstatement.sqlstatementtype == ESqlStatementType.sstmysqldelimiter)) {
428                        gst = EFindSqlStateType.stnormal;
429                        userDelimiterStr = "";
430                        for (int k = 0; k < gcurrentsqlstatement.sourcetokenlist.size(); k++) {
431                            TSourceToken st = gcurrentsqlstatement.sourcetokenlist.get(k);
432                            if ((st.tokencode == TBaseType.rrw_mysql_delimiter)
433                                    || (st.tokencode == TBaseType.lexnewline)
434                                    || (st.tokencode == TBaseType.lexspace)
435                                    || (st.tokencode == TBaseType.rrw_set)) {
436                                continue;
437                            }
438                            userDelimiterStr += st.toString();
439                        }
440                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
441                        continue;
442                    }
443                    break;
444                }
445                case ststoredprocedure: {
446                    // Single stmt in function/procedure/trigger may use ; as terminate char
447                    if (waitingDelimiter) {
448                        if (userDelimiterStr.equalsIgnoreCase(ast.toString())) {
449                            gst = EFindSqlStateType.stnormal;
450                            gcurrentsqlstatement.semicolonended = ast;
451                            onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
452                            continue;
453                        } else if (userDelimiterStr.startsWith(ast.toString())) {
454                            String lcstr = ast.toString();
455                            for (int k = ast.posinlist + 1; k < ast.container.size(); k++) {
456                                TSourceToken st = ast.container.get(k);
457                                if ((st.tokencode == TBaseType.rrw_mysql_delimiter) || (st.tokencode == TBaseType.lexnewline) || (st.tokencode == TBaseType.lexspace)) {
458                                    break;
459                                }
460                                lcstr = lcstr + st.toString();
461                            }
462
463                            if (userDelimiterStr.equalsIgnoreCase(lcstr)) {
464                                for (int k = ast.posinlist; k < ast.container.size(); k++) {
465                                    TSourceToken st = ast.container.get(k);
466                                    if ((st.tokencode == TBaseType.rrw_mysql_delimiter) || (st.tokencode == TBaseType.lexnewline) || (st.tokencode == TBaseType.lexspace)) {
467                                        break;
468                                    }
469                                    ast.tokenstatus = ETokenStatus.tsignorebyyacc;
470                                }
471                                gst = EFindSqlStateType.stnormal;
472                                gcurrentsqlstatement.semicolonended = ast;
473                                onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
474                                continue;
475                            }
476                        }
477                    }
478                    if (ast.tokencode == TBaseType.rrw_begin)
479                        waitingDelimiter = true;
480
481                    if (userDelimiterStr.equals(";") || (waitingDelimiter == false)) {
482                        appendToken(gcurrentsqlstatement, ast);
483                        if (ast.tokentype == ETokenType.ttsemicolon) {
484                            gst = EFindSqlStateType.stnormal;
485                            gcurrentsqlstatement.semicolonended = ast;
486                            onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
487                            continue;
488                        }
489                    } else {
490                        if (ast.toString().equals(userDelimiterStr)) {
491                            ast.tokenstatus = ETokenStatus.tsignorebyyacc;
492                            appendToken(gcurrentsqlstatement, ast);
493                            gst = EFindSqlStateType.stnormal;
494                            onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
495                        } else {
496                            if ((ast.tokentype == ETokenType.ttsemicolon) && (userDelimiterStr.equals(";"))) {
497                                TSourceToken lcprevtoken = ast.container.nextsolidtoken(ast, -1, false);
498                                if (lcprevtoken != null) {
499                                    if (lcprevtoken.tokencode == TBaseType.rrw_end) {
500                                        gst = EFindSqlStateType.stnormal;
501                                        gcurrentsqlstatement.semicolonended = ast;
502                                        appendToken(gcurrentsqlstatement, ast);
503                                        onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder);
504                                        continue;
505                                    }
506                                }
507                            }
508                            appendToken(gcurrentsqlstatement, ast);
509                        }
510                    }
511                    break;
512                }
513            }
514        }
515
516        // Last statement
517        if (TBaseType.assigned(gcurrentsqlstatement) && ((gst == EFindSqlStateType.stsql) || (gst == EFindSqlStateType.ststoredprocedure) || (gst == EFindSqlStateType.sterror))) {
518            onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, true, builder);
519        }
520
521        // Populate builder with results
522        builder.sqlStatements(this.sqlstatements);
523        builder.syntaxErrors(syntaxErrors instanceof ArrayList ?
524                (ArrayList<TSyntaxError>) syntaxErrors : new ArrayList<>(syntaxErrors));
525        builder.errorCode(syntaxErrors.isEmpty() ? 0 : syntaxErrors.size());
526        builder.errorMessage(syntaxErrors.isEmpty() ? "" :
527                String.format("Raw extraction completed with %d error(s)", syntaxErrors.size()));
528
529        return errorcount;
530    }
531
532    /**
533     * Helper method to check if statement type is in the given array.
534     */
535    private boolean includesqlstatementtype(ESqlStatementType type, ESqlStatementType[] types) {
536        for (ESqlStatementType t : types) {
537            if (type == t) return true;
538        }
539        return false;
540    }
541
542    // ========== Parsing Phase ==========
543
544    @Override
545    protected TStatementList performParsing(ParserContext context, TCustomParser mainParser,
546                                            TCustomParser secondaryParser, TSourceTokenList tokens,
547                                            TStatementList rawStatements) {
548        // Store references
549        this.parserContext = context;
550        this.sourcetokenlist = tokens;
551        this.sqlstatements = rawStatements;
552
553        // Initialize sqlcmds for this vendor
554        this.sqlcmds = SqlCmdsFactory.get(vendor);
555
556        // Inject sqlcmds into parser
557        this.fparser.sqlcmds = this.sqlcmds;
558
559        // Initialize global context
560        initializeGlobalContext();
561
562        // Parse each statement
563        for (int i = 0; i < sqlstatements.size(); i++) {
564            TCustomSqlStatement stmt = sqlstatements.getRawSql(i);
565            try {
566                stmt.setFrameStack(frameStack);
567                int parseResult = stmt.parsestatement(null, false, context.isOnlyNeedRawParseTree());
568
569                // Vendor-specific post-processing
570                afterStatementParsed(stmt);
571
572                // Error recovery
573                boolean doRecover = TBaseType.ENABLE_ERROR_RECOVER_IN_CREATE_TABLE;
574                if (doRecover && ((parseResult != 0) || (stmt.getErrorCount() > 0))) {
575                    handleCreateTableErrorRecovery(stmt);
576                }
577
578                // Collect errors
579                if ((parseResult != 0) || (stmt.getErrorCount() > 0)) {
580                    copyErrorsFromStatement(stmt);
581                }
582            } catch (Exception ex) {
583                handleStatementParsingException(stmt, i, ex);
584                continue;
585            }
586        }
587
588        // Clean up frame stack
589        if (globalFrame != null) globalFrame.popMeFromStack(frameStack);
590
591        return sqlstatements;
592    }
593
594    /**
595     * Hook for vendor-specific post-processing after statement is parsed.
596     */
597    protected void afterStatementParsed(TCustomSqlStatement stmt) {
598        // No special post-processing needed for Flink SQL
599    }
600
601    /**
602     * Handle error recovery for CREATE TABLE statements.
603     */
604    protected void handleCreateTableErrorRecovery(TCustomSqlStatement stmt) {
605        if (((stmt.sqlstatementtype == ESqlStatementType.sstcreatetable) ||
606             ((stmt.sqlstatementtype == ESqlStatementType.sstcreateindex) && (vendor != EDbVendor.dbvcouchbase))) &&
607            (!TBaseType.c_createTableStrictParsing)) {
608
609            int nested = 0;
610            boolean isIgnore = false, isFoundIgnoreToken = false;
611            TSourceToken firstIgnoreToken = null;
612
613            for (int k = 0; k < stmt.sourcetokenlist.size(); k++) {
614                TSourceToken st = stmt.sourcetokenlist.get(k);
615
616                if (isIgnore) {
617                    if (st.issolidtoken() && (st.tokencode != ';')) {
618                        isFoundIgnoreToken = true;
619                        if (firstIgnoreToken == null) {
620                            firstIgnoreToken = st;
621                        }
622                    }
623                    if (st.tokencode != ';') {
624                        st.tokencode = TBaseType.sqlpluscmd;
625                    }
626                    continue;
627                }
628
629                if (st.tokencode == (int) ')') {
630                    nested--;
631                    if (nested == 0) {
632                        boolean isSelect = false;
633                        TSourceToken st1 = st.searchToken(TBaseType.rrw_as, 1);
634                        if (st1 != null) {
635                            TSourceToken st2 = st.searchToken((int) '(', 2);
636                            if (st2 != null) {
637                                TSourceToken st3 = st.searchToken(TBaseType.rrw_select, 3);
638                                isSelect = (st3 != null);
639                            }
640                        }
641                        if (!isSelect) isIgnore = true;
642                    }
643                }
644
645                if ((st.tokencode == (int) '(') || (st.tokencode == TBaseType.left_parenthesis_2)) {
646                    nested++;
647                }
648            }
649
650            // Retry parsing if we found ignoreable properties
651            if (isFoundIgnoreToken) {
652                stmt.clearError();
653                stmt.parsestatement(null, false, this.parserContext.isOnlyNeedRawParseTree());
654            }
655        }
656    }
657
658    // ========== Semantic Analysis ==========
659
660    @Override
661    protected void performSemanticAnalysis(ParserContext context, TStatementList statements) {
662        if (!TBaseType.isEnableResolver()) {
663            return;
664        }
665
666        if (!getSyntaxErrors().isEmpty()) {
667            return;
668        }
669
670        // Run semantic resolver
671        TSQLResolver resolver = new TSQLResolver(globalContext, statements);
672        resolver.resolve();
673    }
674
675    // ========== Interpretation ==========
676
677    @Override
678    protected void performInterpreter(ParserContext context, TStatementList statements) {
679        if (!TBaseType.ENABLE_INTERPRETER) {
680            return;
681        }
682        // Flink SQL interpretation not currently supported
683    }
684
685    @Override
686    public String toString() {
687        return "FlinkSqlParser{vendor=" + vendor + "}";
688    }
689}