001package gudusoft.gsqlparser.nodes.flink;
002
003import gudusoft.gsqlparser.nodes.TExpression;
004import gudusoft.gsqlparser.nodes.TObjectName;
005import gudusoft.gsqlparser.nodes.TParseTreeNode;
006import gudusoft.gsqlparser.nodes.TParseTreeVisitor;
007
008/**
009 * Represents Flink's WATERMARK clause for event-time processing.
010 *
011 * <p>The WATERMARK clause defines watermark generation strategies for event-time processing
012 * in Flink SQL. It specifies which column represents event time and the watermark strategy
013 * expression.</p>
014 *
015 * <h3>Syntax:</h3>
016 * <pre>
017 * WATERMARK FOR rowtime_column AS watermark_strategy_expression
018 * </pre>
019 *
020 * <h3>Requirements:</h3>
021 * <ul>
022 *   <li>The rowtime_column must be an existing column of type TIMESTAMP(3)</li>
023 *   <li>The watermark_strategy_expression must return TIMESTAMP(3)</li>
024 *   <li>Watermarks must only emit non-null values larger than previously emitted ones</li>
025 * </ul>
026 *
027 * <h3>Common Strategies:</h3>
028 * <ul>
029 *   <li><b>Strictly ascending:</b> {@code WATERMARK FOR order_time AS order_time}</li>
030 *   <li><b>Ascending (1ms tolerance):</b> {@code WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND}</li>
031 *   <li><b>Bounded delay:</b> {@code WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND}</li>
032 * </ul>
033 *
034 * <h3>Example:</h3>
035 * <pre>
036 * CREATE TABLE Orders (
037 *     `user` BIGINT,
038 *     product STRING,
039 *     order_time TIMESTAMP(3),
040 *     WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
041 * ) WITH (
042 *     'connector' = 'kafka',
043 *     'topic' = 'orders'
044 * );
045 * </pre>
046 *
047 * @see <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/">Flink SQL CREATE TABLE</a>
048 */
049public class TFlinkWatermarkClause extends TParseTreeNode {
050
051    /**
052     * The column that represents event time (rowtime).
053     * Must be an existing TIMESTAMP(3) column in the table.
054     */
055    private TObjectName eventTimeColumn;
056
057    /**
058     * The watermark strategy expression.
059     * Typically the event time column minus some interval for bounded out-of-orderness.
060     */
061    private TExpression watermarkExpression;
062
063    /**
064     * Initialize with event time column and watermark expression.
065     *
066     * @param arg1 the event time column name (TObjectName)
067     * @param arg2 the watermark strategy expression (TExpression)
068     */
069    public void init(Object arg1, Object arg2) {
070        this.eventTimeColumn = (TObjectName) arg1;
071        this.watermarkExpression = (TExpression) arg2;
072    }
073
074    /**
075     * Get the event time column name.
076     *
077     * <p>This is the column that Flink uses as the event time attribute.
078     * It must be of type TIMESTAMP(3).</p>
079     *
080     * @return the event time column name
081     */
082    public TObjectName getEventTimeColumn() {
083        return eventTimeColumn;
084    }
085
086    /**
087     * Set the event time column name.
088     *
089     * @param eventTimeColumn the event time column name
090     */
091    public void setEventTimeColumn(TObjectName eventTimeColumn) {
092        this.eventTimeColumn = eventTimeColumn;
093    }
094
095    /**
096     * Get the watermark strategy expression.
097     *
098     * <p>This expression defines how watermarks are generated. Common patterns include:</p>
099     * <ul>
100     *   <li>Strictly ascending: just the column name (e.g., {@code order_time})</li>
101     *   <li>Bounded out-of-orderness: column minus interval (e.g., {@code order_time - INTERVAL '5' SECOND})</li>
102     * </ul>
103     *
104     * @return the watermark strategy expression
105     */
106    public TExpression getWatermarkExpression() {
107        return watermarkExpression;
108    }
109
110    /**
111     * Set the watermark strategy expression.
112     *
113     * @param watermarkExpression the watermark strategy expression
114     */
115    public void setWatermarkExpression(TExpression watermarkExpression) {
116        this.watermarkExpression = watermarkExpression;
117    }
118
119    /**
120     * Get the event time column name as a string.
121     *
122     * @return the column name string, or null if not set
123     */
124    public String getEventTimeColumnName() {
125        return eventTimeColumn != null ? eventTimeColumn.toString() : null;
126    }
127
128    @Override
129    public void accept(TParseTreeVisitor v) {
130        v.preVisit(this);
131        v.postVisit(this);
132    }
133
134    @Override
135    public void acceptChildren(TParseTreeVisitor v) {
136        v.preVisit(this);
137        if (eventTimeColumn != null) {
138            eventTimeColumn.accept(v);
139        }
140        if (watermarkExpression != null) {
141            watermarkExpression.acceptChildren(v);
142        }
143        v.postVisit(this);
144    }
145}