001package gudusoft.gsqlparser.stmt;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.ESqlStatementType;
005import gudusoft.gsqlparser.TCustomSqlStatement;
006import gudusoft.gsqlparser.nodes.*;
007import gudusoft.gsqlparser.nodes.flink.TFlinkTableProperty;
008
009/**
010 * CREATE ROUTINE LOAD statement for StarRocks.
011 *
012 * Syntax:
013 * CREATE ROUTINE LOAD [database.]job_name ON table_name
014 * [load_properties]
015 * [job_properties]
016 * FROM data_source
017 * [data_source_properties]
018 *
019 * load_properties:
020 *   [COLUMNS TERMINATED BY '<column_separator>']
021 *   [ROWS TERMINATED BY '<row_separator>']
022 *   [COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ...])]
023 *   [WHERE <expr>]
024 *   [PARTITION (<partition1_name>[, <partition2_name>, ...])]
025 *
026 * job_properties:
027 *   PROPERTIES ("key" = "value"[, ...])
028 *
029 * data_source:
030 *   FROM KAFKA ("key" = "value"[, ...])
031 */
032public class TCreateRoutineLoadStmt extends TCustomSqlStatement {
033
034    // Job identification
035    private TObjectName jobName;
036    private TTable targetTable;
037
038    // Load properties
039    private String columnSeparator;
040    private String rowSeparator;
041    private TObjectNameList columnList;
042    private TExpression filterExpression;
043    private TObjectNameList partitionList;
044    private TObjectNameList temporaryPartitionList;
045
046    // Job properties (PROPERTIES clause)
047    private TPTNodeList<TFlinkTableProperty> jobProperties;
048
049    // Data source
050    private String dataSourceType; // KAFKA, PULSAR, etc.
051    private TPTNodeList<TFlinkTableProperty> dataSourceProperties;
052
053    public TCreateRoutineLoadStmt(EDbVendor dbvendor) {
054        super(dbvendor);
055        sqlstatementtype = ESqlStatementType.sststarrocksCreateRoutineLoad;
056    }
057
058    // Getters
059    public TObjectName getJobName() {
060        return jobName;
061    }
062
063    public TTable getTargetTable() {
064        return targetTable;
065    }
066
067    public String getColumnSeparator() {
068        return columnSeparator;
069    }
070
071    public String getRowSeparator() {
072        return rowSeparator;
073    }
074
075    public TObjectNameList getColumnList() {
076        return columnList;
077    }
078
079    public TExpression getFilterExpression() {
080        return filterExpression;
081    }
082
083    public TObjectNameList getPartitionList() {
084        return partitionList;
085    }
086
087    public TObjectNameList getTemporaryPartitionList() {
088        return temporaryPartitionList;
089    }
090
091    public TPTNodeList<TFlinkTableProperty> getJobProperties() {
092        return jobProperties;
093    }
094
095    public String getDataSourceType() {
096        return dataSourceType;
097    }
098
099    public TPTNodeList<TFlinkTableProperty> getDataSourceProperties() {
100        return dataSourceProperties;
101    }
102
103    @Override
104    public int doParseStatement(TCustomSqlStatement psql) {
105        if (rootNode == null) return -1;
106        super.doParseStatement(psql);
107
108        TCreateRoutineLoadSqlNode node = (TCreateRoutineLoadSqlNode) rootNode;
109
110        this.jobName = node.getJobName();
111
112        if (node.getTableName() != null) {
113            TFromTable fromTable = new TFromTable();
114            fromTable.init(node.getTableName());
115            this.targetTable = this.analyzeFromTable(fromTable, true);
116        }
117
118        if (node.getColumnSeparator() != null) {
119            this.columnSeparator = node.getColumnSeparator().toString();
120        }
121
122        if (node.getRowSeparator() != null) {
123            this.rowSeparator = node.getRowSeparator().toString();
124        }
125
126        this.columnList = node.getColumnList();
127        this.filterExpression = node.getWhereClause();
128        this.partitionList = node.getPartitionList();
129        this.temporaryPartitionList = node.getTemporaryPartitionList();
130        this.jobProperties = node.getJobProperties();
131
132        if (node.getDataSourceType() != null) {
133            this.dataSourceType = node.getDataSourceType().toString();
134        }
135
136        this.dataSourceProperties = node.getDataSourceProperties();
137
138        return 0;
139    }
140
141    @Override
142    public void accept(TParseTreeVisitor v) {
143        v.preVisit(this);
144        v.postVisit(this);
145    }
146
147    @Override
148    public void acceptChildren(TParseTreeVisitor v) {
149        v.preVisit(this);
150        v.postVisit(this);
151    }
152}