001package gudusoft.gsqlparser.parser; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.TBaseType; 005import gudusoft.gsqlparser.TCustomLexer; 006import gudusoft.gsqlparser.TCustomParser; 007import gudusoft.gsqlparser.TCustomSqlStatement; 008import gudusoft.gsqlparser.TLexerFlink; 009import gudusoft.gsqlparser.TParserFlink; 010import gudusoft.gsqlparser.TSourceToken; 011import gudusoft.gsqlparser.TSourceTokenList; 012import gudusoft.gsqlparser.TStatementList; 013import gudusoft.gsqlparser.TSyntaxError; 014import gudusoft.gsqlparser.EFindSqlStateType; 015import gudusoft.gsqlparser.ETokenType; 016import gudusoft.gsqlparser.ETokenStatus; 017import gudusoft.gsqlparser.ESqlStatementType; 018import gudusoft.gsqlparser.EErrorType; 019import gudusoft.gsqlparser.stmt.TUnknownSqlStatement; 020import gudusoft.gsqlparser.stmt.mysql.TMySQLSource; 021import gudusoft.gsqlparser.sqlcmds.ISqlCmds; 022import gudusoft.gsqlparser.sqlcmds.SqlCmdsFactory; 023import gudusoft.gsqlparser.compiler.TContext; 024import gudusoft.gsqlparser.sqlenv.TSQLEnv; 025import gudusoft.gsqlparser.compiler.TGlobalScope; 026import gudusoft.gsqlparser.compiler.TFrame; 027import gudusoft.gsqlparser.resolver.TSQLResolver; 028import gudusoft.gsqlparser.TLog; 029import gudusoft.gsqlparser.compiler.TASTEvaluator; 030 031import java.io.BufferedReader; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Stack; 035 036/** 037 * Apache Flink SQL parser implementation. 038 * 039 * <p>This parser handles Flink SQL-specific syntax including: 040 * <ul> 041 * <li>Flink SQL DML/DDL operations</li> 042 * <li>Special token handling for DATE, TIME, TIMESTAMP, INTERVAL</li> 043 * <li>Stored procedures, functions, and triggers</li> 044 * <li>Flink-specific statements: CREATE CATALOG, CREATE MODEL, etc.</li> 045 * </ul> 046 * 047 * <p><b>Implementation Status:</b> NEW 048 * <ul> 049 * <li><b>Base:</b> SparkSQL (Apache Calcite foundation)</li> 050 * <li><b>Tokenization:</b> doflinktexttotokenlist()</li> 051 * <li><b>Raw Extraction:</b> doflinkgetrawsqlstatements()</li> 052 * <li><b>Parsing:</b> Fully self-contained using TParserFlink</li> 053 * </ul> 054 * 055 * @see SqlParser 056 * @see AbstractSqlParser 057 * @see TLexerFlink 058 * @see TParserFlink 059 * @since 3.2.0.0 060 */ 061public class FlinkSqlParser extends AbstractSqlParser { 062 063 // Vendor-specific parser and lexer 064 private TLexerFlink flexer; 065 private TParserFlink fparser; 066 067 // State management for raw statement extraction 068 private TCustomSqlStatement gcurrentsqlstatement; 069 private String userDelimiterStr; 070 private char curdelimiterchar; 071 072 /** 073 * Construct Flink SQL parser. 074 * <p> 075 * Configures the parser for Flink SQL with default delimiter (;). 076 */ 077 public FlinkSqlParser() { 078 super(EDbVendor.dbvflink); 079 this.delimiterChar = ';'; // Flink SQL delimiter 080 this.defaultDelimiterStr = ";"; // Default delimiter 081 082 // Create lexer once - will be reused for all parsing operations 083 this.flexer = new TLexerFlink(); 084 this.flexer.delimiterchar = this.delimiterChar; 085 this.flexer.defaultDelimiterStr = this.defaultDelimiterStr; 086 087 // Set parent's lexer reference for shared tokenization logic 088 this.lexer = this.flexer; 089 090 // Create parser once - will be reused for all parsing operations 091 this.fparser = new TParserFlink(null); 092 this.fparser.lexer = this.flexer; 093 } 094 095 @Override 096 public EDbVendor getVendor() { 097 return vendor; 098 } 099 100 // ========== Abstract Method Implementations ========== 101 102 @Override 103 protected TCustomLexer getLexer(ParserContext context) { 104 return this.flexer; 105 } 106 107 @Override 108 protected TCustomParser getParser(ParserContext context, TSourceTokenList tokens) { 109 return this.fparser; 110 } 111 112 @Override 113 protected TCustomParser getSecondaryParser(ParserContext context, TSourceTokenList tokens) { 114 // Flink SQL doesn't have a secondary parser 115 return null; 116 } 117 118 // ========== Tokenization Phase (Hook Pattern) ========== 119 120 /** 121 * Hook method for vendor-specific tokenization. 122 * <p> 123 * Delegates to doflinktexttotokenlist() which implements Flink SQL-specific 124 * token processing logic. 125 */ 126 @Override 127 protected void tokenizeVendorSql() { 128 doflinktexttotokenlist(); 129 } 130 131 /** 132 * Flink SQL-specific tokenization logic. 133 * <p> 134 * Special handling: 135 * <ul> 136 * <li>MySQL-style comment validation</li> 137 * <li>WITH ROLLUP token adjustment</li> 138 * <li>Delimiter detection</li> 139 * </ul> 140 */ 141 private void doflinktexttotokenlist() { 142 TSourceToken asourcetoken, lcprevst; 143 int yychar; 144 boolean startDelimiter = false; 145 146 flexer.tmpDelimiter = ""; 147 148 asourcetoken = getanewsourcetoken(); 149 if (asourcetoken == null) return; 150 yychar = asourcetoken.tokencode; 151 152 while (yychar > 0) { 153 sourcetokenlist.add(asourcetoken); 154 asourcetoken = getanewsourcetoken(); 155 if (asourcetoken == null) break; 156 checkFlinkCommentToken(asourcetoken); 157 158 if ((asourcetoken.tokencode == TBaseType.lexnewline) && (startDelimiter)) { 159 startDelimiter = false; 160 flexer.tmpDelimiter = sourcetokenlist.get(sourcetokenlist.size() - 1).getAstext(); 161 } 162 163 if (asourcetoken.tokencode == TBaseType.rrw_rollup) { 164 // with rollup 165 lcprevst = getprevsolidtoken(asourcetoken); 166 if (lcprevst != null) { 167 if (lcprevst.tokencode == TBaseType.rrw_with) 168 lcprevst.tokencode = TBaseType.with_rollup; 169 } 170 } 171 172 yychar = asourcetoken.tokencode; 173 } 174 } 175 176 /** 177 * Helper method for Flink-style comment validation. 178 */ 179 private void checkFlinkCommentToken(TSourceToken cmtToken) { 180 // No-op: similar to SparkSQL 181 } 182 183 /** 184 * Helper method to get previous solid token (non-whitespace, non-comment). 185 */ 186 private TSourceToken getprevsolidtoken(TSourceToken ptoken) { 187 TSourceToken lcprevtoken = null; 188 int i = ptoken.posinlist; 189 while (i > 0) { 190 i--; 191 lcprevtoken = sourcetokenlist.get(i); 192 if ((lcprevtoken.tokencode == TBaseType.lexspace) 193 || (lcprevtoken.tokencode == TBaseType.lexnewline) 194 || (lcprevtoken.tokencode == TBaseType.cmtdoublehyphen) 195 || (lcprevtoken.tokencode == TBaseType.cmtslashstar)) { 196 continue; 197 } 198 return lcprevtoken; 199 } 200 return null; 201 } 202 203 /** 204 * Helper method to add token to statement. 205 */ 206 private void appendToken(TCustomSqlStatement statement, TSourceToken token) { 207 if (statement == null || token == null) { 208 return; 209 } 210 token.stmt = statement; 211 statement.sourcetokenlist.add(token); 212 } 213 214 // ========== Raw Statement Extraction Phase (Hook Pattern) ========== 215 216 /** 217 * Hook method to setup parsers before raw statement extraction. 218 */ 219 @Override 220 protected void setupVendorParsersForExtraction() { 221 this.fparser.sqlcmds = this.sqlcmds; 222 this.fparser.sourcetokenlist = this.sourcetokenlist; 223 } 224 225 /** 226 * Hook method for vendor-specific raw statement extraction. 227 */ 228 @Override 229 protected void extractVendorRawStatements(SqlParseResult.Builder builder) { 230 doflinkgetrawsqlstatements(builder); 231 } 232 233 /** 234 * Flink SQL-specific raw statement extraction logic. 235 * <p> 236 * This method: 237 * <ul> 238 * <li>Adjusts DATE, TIME, TIMESTAMP, INTERVAL token codes based on context</li> 239 * <li>Handles statement boundaries (semicolon, custom delimiters)</li> 240 * <li>Supports stored procedures with BEGIN/END blocks</li> 241 * </ul> 242 * 243 * @param builder the result builder to collect errors 244 * @return error count (currently always 0) 245 */ 246 private int doflinkgetrawsqlstatements(SqlParseResult.Builder builder) { 247 int errorcount = 0; 248 gcurrentsqlstatement = null; 249 EFindSqlStateType gst = EFindSqlStateType.stnormal; 250 int i; 251 TSourceToken ast; 252 boolean waitingDelimiter = false; 253 254 // Reset delimiter 255 userDelimiterStr = defaultDelimiterStr; 256 257 for (i = 0; i < sourcetokenlist.size(); i++) { 258 ast = sourcetokenlist.get(i); 259 sourcetokenlist.curpos = i; 260 261 // Flink SQL-specific token adjustments (similar to SparkSQL) 262 if (ast.tokencode == TBaseType.rrw_date) { 263 TSourceToken st1 = ast.nextSolidToken(); 264 if (st1 != null) { 265 if (st1.tokencode == '(') { 266 ast.tokencode = TBaseType.rrw_spark_date_function; 267 } else if (st1.tokencode == TBaseType.sconst) { 268 ast.tokencode = TBaseType.rrw_spark_date_const; 269 } 270 } 271 } else if (ast.tokencode == TBaseType.rrw_time) { 272 TSourceToken st1 = ast.nextSolidToken(); 273 if (st1 != null) { 274 if (st1.tokencode == TBaseType.sconst) { 275 ast.tokencode = TBaseType.rrw_spark_time_const; 276 } 277 } 278 } else if (ast.tokencode == TBaseType.rrw_timestamp) { 279 TSourceToken st1 = ast.nextSolidToken(); 280 if (st1 != null) { 281 if (st1.tokencode == TBaseType.sconst) { 282 ast.tokencode = TBaseType.rrw_spark_timestamp_constant; 283 } else if (st1.tokencode == TBaseType.ident) { 284 if (st1.toString().startsWith("\"")) { 285 ast.tokencode = TBaseType.rrw_spark_timestamp_constant; 286 st1.tokencode = TBaseType.sconst; 287 } 288 } 289 } 290 } else if (ast.tokencode == TBaseType.rrw_interval) { 291 TSourceToken leftParen = ast.searchToken('(', 1); 292 if (leftParen != null) { 293 int k = leftParen.posinlist + 1; 294 boolean commaToken = false; 295 while (k < ast.container.size()) { 296 if (ast.container.get(k).tokencode == ')') break; 297 if (ast.container.get(k).tokencode == ',') { 298 commaToken = true; 299 break; 300 } 301 k++; 302 } 303 if (commaToken) { 304 ast.tokencode = TBaseType.rrw_mysql_interval_func; 305 } 306 } 307 } else if (ast.tokencode == TBaseType.rrw_spark_position) { 308 TSourceToken leftParen = ast.searchToken('(', 1); 309 if (leftParen != null) { 310 // POSITION is a function 311 } else { 312 ast.tokencode = TBaseType.ident; // treat it as identifier 313 } 314 } 315 // Handle LOCALTIME: distinguish function from identifier based on context 316 // Function context: SELECT LOCALTIME, WHERE LOCALTIME > x 317 // Identifier context: SELECT t.localtime (after period - field reference) 318 // Note: Use flink_rw_localtime (707) which matches the Flink lexer's token code 319 else if (ast.tokencode == TBaseType.flink_rw_localtime) { 320 TSourceToken prevToken = ast.prevSolidToken(); 321 // If preceded by period, it's a field reference (identifier) 322 // Otherwise, it's a builtin function 323 if (prevToken == null || prevToken.tokencode != '.') { 324 ast.tokencode = TBaseType.rrw_flink_localtime_as_func; 325 } 326 } 327 // Handle LOCALTIMESTAMP: same logic as LOCALTIME 328 // Note: Use flink_rw_localtimestamp (708) which matches the Flink lexer's token code 329 else if (ast.tokencode == TBaseType.flink_rw_localtimestamp) { 330 TSourceToken prevToken = ast.prevSolidToken(); 331 if (prevToken == null || prevToken.tokencode != '.') { 332 ast.tokencode = TBaseType.rrw_flink_localtimestamp_as_func; 333 } 334 } 335 336 switch (gst) { 337 case sterror: { 338 if (ast.tokentype == ETokenType.ttsemicolon) { 339 appendToken(gcurrentsqlstatement, ast); 340 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 341 gst = EFindSqlStateType.stnormal; 342 } else { 343 appendToken(gcurrentsqlstatement, ast); 344 } 345 break; 346 } 347 case stnormal: { 348 if ((ast.tokencode == TBaseType.cmtdoublehyphen) 349 || (ast.tokencode == TBaseType.cmtslashstar) 350 || (ast.tokencode == TBaseType.lexspace) 351 || (ast.tokencode == TBaseType.lexnewline) 352 || (ast.tokentype == ETokenType.ttsemicolon)) { 353 if (TBaseType.assigned(gcurrentsqlstatement)) { 354 appendToken(gcurrentsqlstatement, ast); 355 } 356 continue; 357 } 358 359 if ((ast.isFirstTokenOfLine()) && ((ast.tokencode == TBaseType.rrw_mysql_source) || (ast.tokencode == TBaseType.slash_dot))) { 360 gst = EFindSqlStateType.stsqlplus; 361 gcurrentsqlstatement = new TMySQLSource(vendor); 362 appendToken(gcurrentsqlstatement, ast); 363 continue; 364 } 365 366 // Find a token to start sql or plsql mode 367 gcurrentsqlstatement = sqlcmds.issql(ast, gst, gcurrentsqlstatement); 368 369 if (TBaseType.assigned(gcurrentsqlstatement)) { 370 ESqlStatementType[] ses = {ESqlStatementType.sstmysqlcreateprocedure, ESqlStatementType.sstmysqlcreatefunction, 371 ESqlStatementType.sstcreateprocedure, ESqlStatementType.sstcreatefunction, 372 ESqlStatementType.sstcreatetrigger}; 373 if (includesqlstatementtype(gcurrentsqlstatement.sqlstatementtype, ses)) { 374 gst = EFindSqlStateType.ststoredprocedure; 375 waitingDelimiter = false; 376 appendToken(gcurrentsqlstatement, ast); 377 curdelimiterchar = ';'; 378 } else { 379 gst = EFindSqlStateType.stsql; 380 appendToken(gcurrentsqlstatement, ast); 381 } 382 } 383 384 if (!TBaseType.assigned(gcurrentsqlstatement)) { 385 // Error token found 386 this.syntaxErrors.add(new TSyntaxError(ast.getAstext(), ast.lineNo, (ast.columnNo < 0 ? 0 : ast.columnNo), 387 "Error when tokenlize", EErrorType.spwarning, TBaseType.MSG_WARNING_ERROR_WHEN_TOKENIZE, null, ast.posinlist)); 388 389 ast.tokentype = ETokenType.tttokenlizererrortoken; 390 gst = EFindSqlStateType.sterror; 391 392 gcurrentsqlstatement = new TUnknownSqlStatement(vendor); 393 gcurrentsqlstatement.sqlstatementtype = ESqlStatementType.sstinvalid; 394 appendToken(gcurrentsqlstatement, ast); 395 } 396 break; 397 } 398 case stsqlplus: { 399 if (ast.tokencode == TBaseType.lexnewline) { 400 gst = EFindSqlStateType.stnormal; 401 appendToken(gcurrentsqlstatement, ast); 402 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 403 } else { 404 appendToken(gcurrentsqlstatement, ast); 405 } 406 break; 407 } 408 case stsql: { 409 if ((ast.tokentype == ETokenType.ttsemicolon) && (gcurrentsqlstatement.sqlstatementtype != ESqlStatementType.sstmysqldelimiter)) { 410 gst = EFindSqlStateType.stnormal; 411 appendToken(gcurrentsqlstatement, ast); 412 gcurrentsqlstatement.semicolonended = ast; 413 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 414 continue; 415 } 416 if (ast.toString().equalsIgnoreCase(userDelimiterStr)) { 417 gst = EFindSqlStateType.stnormal; 418 ast.tokencode = ';'; // treat it as semicolon 419 appendToken(gcurrentsqlstatement, ast); 420 gcurrentsqlstatement.semicolonended = ast; 421 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 422 continue; 423 } 424 appendToken(gcurrentsqlstatement, ast); 425 426 if ((ast.tokencode == TBaseType.lexnewline) 427 && (gcurrentsqlstatement.sqlstatementtype == ESqlStatementType.sstmysqldelimiter)) { 428 gst = EFindSqlStateType.stnormal; 429 userDelimiterStr = ""; 430 for (int k = 0; k < gcurrentsqlstatement.sourcetokenlist.size(); k++) { 431 TSourceToken st = gcurrentsqlstatement.sourcetokenlist.get(k); 432 if ((st.tokencode == TBaseType.rrw_mysql_delimiter) 433 || (st.tokencode == TBaseType.lexnewline) 434 || (st.tokencode == TBaseType.lexspace) 435 || (st.tokencode == TBaseType.rrw_set)) { 436 continue; 437 } 438 userDelimiterStr += st.toString(); 439 } 440 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 441 continue; 442 } 443 break; 444 } 445 case ststoredprocedure: { 446 // Single stmt in function/procedure/trigger may use ; as terminate char 447 if (waitingDelimiter) { 448 if (userDelimiterStr.equalsIgnoreCase(ast.toString())) { 449 gst = EFindSqlStateType.stnormal; 450 gcurrentsqlstatement.semicolonended = ast; 451 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 452 continue; 453 } else if (userDelimiterStr.startsWith(ast.toString())) { 454 String lcstr = ast.toString(); 455 for (int k = ast.posinlist + 1; k < ast.container.size(); k++) { 456 TSourceToken st = ast.container.get(k); 457 if ((st.tokencode == TBaseType.rrw_mysql_delimiter) || (st.tokencode == TBaseType.lexnewline) || (st.tokencode == TBaseType.lexspace)) { 458 break; 459 } 460 lcstr = lcstr + st.toString(); 461 } 462 463 if (userDelimiterStr.equalsIgnoreCase(lcstr)) { 464 for (int k = ast.posinlist; k < ast.container.size(); k++) { 465 TSourceToken st = ast.container.get(k); 466 if ((st.tokencode == TBaseType.rrw_mysql_delimiter) || (st.tokencode == TBaseType.lexnewline) || (st.tokencode == TBaseType.lexspace)) { 467 break; 468 } 469 ast.tokenstatus = ETokenStatus.tsignorebyyacc; 470 } 471 gst = EFindSqlStateType.stnormal; 472 gcurrentsqlstatement.semicolonended = ast; 473 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 474 continue; 475 } 476 } 477 } 478 if (ast.tokencode == TBaseType.rrw_begin) 479 waitingDelimiter = true; 480 481 if (userDelimiterStr.equals(";") || (waitingDelimiter == false)) { 482 appendToken(gcurrentsqlstatement, ast); 483 if (ast.tokentype == ETokenType.ttsemicolon) { 484 gst = EFindSqlStateType.stnormal; 485 gcurrentsqlstatement.semicolonended = ast; 486 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 487 continue; 488 } 489 } else { 490 if (ast.toString().equals(userDelimiterStr)) { 491 ast.tokenstatus = ETokenStatus.tsignorebyyacc; 492 appendToken(gcurrentsqlstatement, ast); 493 gst = EFindSqlStateType.stnormal; 494 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 495 } else { 496 if ((ast.tokentype == ETokenType.ttsemicolon) && (userDelimiterStr.equals(";"))) { 497 TSourceToken lcprevtoken = ast.container.nextsolidtoken(ast, -1, false); 498 if (lcprevtoken != null) { 499 if (lcprevtoken.tokencode == TBaseType.rrw_end) { 500 gst = EFindSqlStateType.stnormal; 501 gcurrentsqlstatement.semicolonended = ast; 502 appendToken(gcurrentsqlstatement, ast); 503 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, false, builder); 504 continue; 505 } 506 } 507 } 508 appendToken(gcurrentsqlstatement, ast); 509 } 510 } 511 break; 512 } 513 } 514 } 515 516 // Last statement 517 if (TBaseType.assigned(gcurrentsqlstatement) && ((gst == EFindSqlStateType.stsql) || (gst == EFindSqlStateType.ststoredprocedure) || (gst == EFindSqlStateType.sterror))) { 518 onRawStatementComplete(this.parserContext, gcurrentsqlstatement, this.fparser, null, this.sqlstatements, true, builder); 519 } 520 521 // Populate builder with results 522 builder.sqlStatements(this.sqlstatements); 523 builder.syntaxErrors(syntaxErrors instanceof ArrayList ? 524 (ArrayList<TSyntaxError>) syntaxErrors : new ArrayList<>(syntaxErrors)); 525 builder.errorCode(syntaxErrors.isEmpty() ? 0 : syntaxErrors.size()); 526 builder.errorMessage(syntaxErrors.isEmpty() ? "" : 527 String.format("Raw extraction completed with %d error(s)", syntaxErrors.size())); 528 529 return errorcount; 530 } 531 532 /** 533 * Helper method to check if statement type is in the given array. 534 */ 535 private boolean includesqlstatementtype(ESqlStatementType type, ESqlStatementType[] types) { 536 for (ESqlStatementType t : types) { 537 if (type == t) return true; 538 } 539 return false; 540 } 541 542 // ========== Parsing Phase ========== 543 544 @Override 545 protected TStatementList performParsing(ParserContext context, TCustomParser mainParser, 546 TCustomParser secondaryParser, TSourceTokenList tokens, 547 TStatementList rawStatements) { 548 // Store references 549 this.parserContext = context; 550 this.sourcetokenlist = tokens; 551 this.sqlstatements = rawStatements; 552 553 // Initialize sqlcmds for this vendor 554 this.sqlcmds = SqlCmdsFactory.get(vendor); 555 556 // Inject sqlcmds into parser 557 this.fparser.sqlcmds = this.sqlcmds; 558 559 // Initialize global context 560 initializeGlobalContext(); 561 562 // Parse each statement 563 for (int i = 0; i < sqlstatements.size(); i++) { 564 TCustomSqlStatement stmt = sqlstatements.getRawSql(i); 565 try { 566 stmt.setFrameStack(frameStack); 567 int parseResult = stmt.parsestatement(null, false, context.isOnlyNeedRawParseTree()); 568 569 // Vendor-specific post-processing 570 afterStatementParsed(stmt); 571 572 // Error recovery 573 boolean doRecover = TBaseType.ENABLE_ERROR_RECOVER_IN_CREATE_TABLE; 574 if (doRecover && ((parseResult != 0) || (stmt.getErrorCount() > 0))) { 575 handleCreateTableErrorRecovery(stmt); 576 } 577 578 // Collect errors 579 if ((parseResult != 0) || (stmt.getErrorCount() > 0)) { 580 copyErrorsFromStatement(stmt); 581 } 582 } catch (Exception ex) { 583 handleStatementParsingException(stmt, i, ex); 584 continue; 585 } 586 } 587 588 // Clean up frame stack 589 if (globalFrame != null) globalFrame.popMeFromStack(frameStack); 590 591 return sqlstatements; 592 } 593 594 /** 595 * Hook for vendor-specific post-processing after statement is parsed. 596 */ 597 protected void afterStatementParsed(TCustomSqlStatement stmt) { 598 // No special post-processing needed for Flink SQL 599 } 600 601 /** 602 * Handle error recovery for CREATE TABLE statements. 603 */ 604 protected void handleCreateTableErrorRecovery(TCustomSqlStatement stmt) { 605 if (((stmt.sqlstatementtype == ESqlStatementType.sstcreatetable) || 606 ((stmt.sqlstatementtype == ESqlStatementType.sstcreateindex) && (vendor != EDbVendor.dbvcouchbase))) && 607 (!TBaseType.c_createTableStrictParsing)) { 608 609 int nested = 0; 610 boolean isIgnore = false, isFoundIgnoreToken = false; 611 TSourceToken firstIgnoreToken = null; 612 613 for (int k = 0; k < stmt.sourcetokenlist.size(); k++) { 614 TSourceToken st = stmt.sourcetokenlist.get(k); 615 616 if (isIgnore) { 617 if (st.issolidtoken() && (st.tokencode != ';')) { 618 isFoundIgnoreToken = true; 619 if (firstIgnoreToken == null) { 620 firstIgnoreToken = st; 621 } 622 } 623 if (st.tokencode != ';') { 624 st.tokencode = TBaseType.sqlpluscmd; 625 } 626 continue; 627 } 628 629 if (st.tokencode == (int) ')') { 630 nested--; 631 if (nested == 0) { 632 boolean isSelect = false; 633 TSourceToken st1 = st.searchToken(TBaseType.rrw_as, 1); 634 if (st1 != null) { 635 TSourceToken st2 = st.searchToken((int) '(', 2); 636 if (st2 != null) { 637 TSourceToken st3 = st.searchToken(TBaseType.rrw_select, 3); 638 isSelect = (st3 != null); 639 } 640 } 641 if (!isSelect) isIgnore = true; 642 } 643 } 644 645 if ((st.tokencode == (int) '(') || (st.tokencode == TBaseType.left_parenthesis_2)) { 646 nested++; 647 } 648 } 649 650 // Retry parsing if we found ignoreable properties 651 if (isFoundIgnoreToken) { 652 stmt.clearError(); 653 stmt.parsestatement(null, false, this.parserContext.isOnlyNeedRawParseTree()); 654 } 655 } 656 } 657 658 // ========== Semantic Analysis ========== 659 660 @Override 661 protected void performSemanticAnalysis(ParserContext context, TStatementList statements) { 662 if (!TBaseType.isEnableResolver()) { 663 return; 664 } 665 666 if (!getSyntaxErrors().isEmpty()) { 667 return; 668 } 669 670 // Run semantic resolver 671 TSQLResolver resolver = new TSQLResolver(globalContext, statements); 672 resolver.resolve(); 673 } 674 675 // ========== Interpretation ========== 676 677 @Override 678 protected void performInterpreter(ParserContext context, TStatementList statements) { 679 if (!TBaseType.ENABLE_INTERPRETER) { 680 return; 681 } 682 // Flink SQL interpretation not currently supported 683 } 684 685 @Override 686 public String toString() { 687 return "FlinkSqlParser{vendor=" + vendor + "}"; 688 } 689}