001package gudusoft.gsqlparser.stmt; 002 003import gudusoft.gsqlparser.EDbVendor; 004import gudusoft.gsqlparser.ESqlStatementType; 005import gudusoft.gsqlparser.TCustomSqlStatement; 006import gudusoft.gsqlparser.nodes.*; 007import gudusoft.gsqlparser.nodes.flink.TFlinkTableProperty; 008 009/** 010 * CREATE ROUTINE LOAD statement for StarRocks. 011 * 012 * Syntax: 013 * CREATE ROUTINE LOAD [database.]job_name ON table_name 014 * [load_properties] 015 * [job_properties] 016 * FROM data_source 017 * [data_source_properties] 018 * 019 * load_properties: 020 * [COLUMNS TERMINATED BY '<column_separator>'] 021 * [ROWS TERMINATED BY '<row_separator>'] 022 * [COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ...])] 023 * [WHERE <expr>] 024 * [PARTITION (<partition1_name>[, <partition2_name>, ...])] 025 * 026 * job_properties: 027 * PROPERTIES ("key" = "value"[, ...]) 028 * 029 * data_source: 030 * FROM KAFKA ("key" = "value"[, ...]) 031 */ 032public class TCreateRoutineLoadStmt extends TCustomSqlStatement { 033 034 // Job identification 035 private TObjectName jobName; 036 private TTable targetTable; 037 038 // Load properties 039 private String columnSeparator; 040 private String rowSeparator; 041 private TObjectNameList columnList; 042 private TExpression filterExpression; 043 private TObjectNameList partitionList; 044 private TObjectNameList temporaryPartitionList; 045 046 // Job properties (PROPERTIES clause) 047 private TPTNodeList<TFlinkTableProperty> jobProperties; 048 049 // Data source 050 private String dataSourceType; // KAFKA, PULSAR, etc. 051 private TPTNodeList<TFlinkTableProperty> dataSourceProperties; 052 053 public TCreateRoutineLoadStmt(EDbVendor dbvendor) { 054 super(dbvendor); 055 sqlstatementtype = ESqlStatementType.sststarrocksCreateRoutineLoad; 056 } 057 058 // Getters 059 public TObjectName getJobName() { 060 return jobName; 061 } 062 063 public TTable getTargetTable() { 064 return targetTable; 065 } 066 067 public String getColumnSeparator() { 068 return columnSeparator; 069 } 070 071 public String getRowSeparator() { 072 return rowSeparator; 073 } 074 075 public TObjectNameList getColumnList() { 076 return columnList; 077 } 078 079 public TExpression getFilterExpression() { 080 return filterExpression; 081 } 082 083 public TObjectNameList getPartitionList() { 084 return partitionList; 085 } 086 087 public TObjectNameList getTemporaryPartitionList() { 088 return temporaryPartitionList; 089 } 090 091 public TPTNodeList<TFlinkTableProperty> getJobProperties() { 092 return jobProperties; 093 } 094 095 public String getDataSourceType() { 096 return dataSourceType; 097 } 098 099 public TPTNodeList<TFlinkTableProperty> getDataSourceProperties() { 100 return dataSourceProperties; 101 } 102 103 @Override 104 public int doParseStatement(TCustomSqlStatement psql) { 105 if (rootNode == null) return -1; 106 super.doParseStatement(psql); 107 108 TCreateRoutineLoadSqlNode node = (TCreateRoutineLoadSqlNode) rootNode; 109 110 this.jobName = node.getJobName(); 111 112 if (node.getTableName() != null) { 113 TFromTable fromTable = new TFromTable(); 114 fromTable.init(node.getTableName()); 115 this.targetTable = this.analyzeFromTable(fromTable, true); 116 } 117 118 if (node.getColumnSeparator() != null) { 119 this.columnSeparator = node.getColumnSeparator().toString(); 120 } 121 122 if (node.getRowSeparator() != null) { 123 this.rowSeparator = node.getRowSeparator().toString(); 124 } 125 126 this.columnList = node.getColumnList(); 127 this.filterExpression = node.getWhereClause(); 128 this.partitionList = node.getPartitionList(); 129 this.temporaryPartitionList = node.getTemporaryPartitionList(); 130 this.jobProperties = node.getJobProperties(); 131 132 if (node.getDataSourceType() != null) { 133 this.dataSourceType = node.getDataSourceType().toString(); 134 } 135 136 this.dataSourceProperties = node.getDataSourceProperties(); 137 138 return 0; 139 } 140 141 @Override 142 public void accept(TParseTreeVisitor v) { 143 v.preVisit(this); 144 v.postVisit(this); 145 } 146 147 @Override 148 public void acceptChildren(TParseTreeVisitor v) { 149 v.preVisit(this); 150 v.postVisit(this); 151 } 152}