001package gudusoft.gsqlparser.nodes.flink; 002 003import gudusoft.gsqlparser.nodes.TExpression; 004import gudusoft.gsqlparser.nodes.TObjectName; 005import gudusoft.gsqlparser.nodes.TParseTreeNode; 006import gudusoft.gsqlparser.nodes.TParseTreeVisitor; 007 008/** 009 * Represents Flink's WATERMARK clause for event-time processing. 010 * 011 * <p>The WATERMARK clause defines watermark generation strategies for event-time processing 012 * in Flink SQL. It specifies which column represents event time and the watermark strategy 013 * expression.</p> 014 * 015 * <h3>Syntax:</h3> 016 * <pre> 017 * WATERMARK FOR rowtime_column AS watermark_strategy_expression 018 * </pre> 019 * 020 * <h3>Requirements:</h3> 021 * <ul> 022 * <li>The rowtime_column must be an existing column of type TIMESTAMP(3)</li> 023 * <li>The watermark_strategy_expression must return TIMESTAMP(3)</li> 024 * <li>Watermarks must only emit non-null values larger than previously emitted ones</li> 025 * </ul> 026 * 027 * <h3>Common Strategies:</h3> 028 * <ul> 029 * <li><b>Strictly ascending:</b> {@code WATERMARK FOR order_time AS order_time}</li> 030 * <li><b>Ascending (1ms tolerance):</b> {@code WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND}</li> 031 * <li><b>Bounded delay:</b> {@code WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND}</li> 032 * </ul> 033 * 034 * <h3>Example:</h3> 035 * <pre> 036 * CREATE TABLE Orders ( 037 * `user` BIGINT, 038 * product STRING, 039 * order_time TIMESTAMP(3), 040 * WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 041 * ) WITH ( 042 * 'connector' = 'kafka', 043 * 'topic' = 'orders' 044 * ); 045 * </pre> 046 * 047 * @see <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/">Flink SQL CREATE TABLE</a> 048 */ 049public class TFlinkWatermarkClause extends TParseTreeNode { 050 051 /** 052 * The column that represents event time (rowtime). 053 * Must be an existing TIMESTAMP(3) column in the table. 054 */ 055 private TObjectName eventTimeColumn; 056 057 /** 058 * The watermark strategy expression. 059 * Typically the event time column minus some interval for bounded out-of-orderness. 060 */ 061 private TExpression watermarkExpression; 062 063 /** 064 * Initialize with event time column and watermark expression. 065 * 066 * @param arg1 the event time column name (TObjectName) 067 * @param arg2 the watermark strategy expression (TExpression) 068 */ 069 public void init(Object arg1, Object arg2) { 070 this.eventTimeColumn = (TObjectName) arg1; 071 this.watermarkExpression = (TExpression) arg2; 072 } 073 074 /** 075 * Get the event time column name. 076 * 077 * <p>This is the column that Flink uses as the event time attribute. 078 * It must be of type TIMESTAMP(3).</p> 079 * 080 * @return the event time column name 081 */ 082 public TObjectName getEventTimeColumn() { 083 return eventTimeColumn; 084 } 085 086 /** 087 * Set the event time column name. 088 * 089 * @param eventTimeColumn the event time column name 090 */ 091 public void setEventTimeColumn(TObjectName eventTimeColumn) { 092 this.eventTimeColumn = eventTimeColumn; 093 } 094 095 /** 096 * Get the watermark strategy expression. 097 * 098 * <p>This expression defines how watermarks are generated. Common patterns include:</p> 099 * <ul> 100 * <li>Strictly ascending: just the column name (e.g., {@code order_time})</li> 101 * <li>Bounded out-of-orderness: column minus interval (e.g., {@code order_time - INTERVAL '5' SECOND})</li> 102 * </ul> 103 * 104 * @return the watermark strategy expression 105 */ 106 public TExpression getWatermarkExpression() { 107 return watermarkExpression; 108 } 109 110 /** 111 * Set the watermark strategy expression. 112 * 113 * @param watermarkExpression the watermark strategy expression 114 */ 115 public void setWatermarkExpression(TExpression watermarkExpression) { 116 this.watermarkExpression = watermarkExpression; 117 } 118 119 /** 120 * Get the event time column name as a string. 121 * 122 * @return the column name string, or null if not set 123 */ 124 public String getEventTimeColumnName() { 125 return eventTimeColumn != null ? eventTimeColumn.toString() : null; 126 } 127 128 @Override 129 public void accept(TParseTreeVisitor v) { 130 v.preVisit(this); 131 v.postVisit(this); 132 } 133 134 @Override 135 public void acceptChildren(TParseTreeVisitor v) { 136 v.preVisit(this); 137 if (eventTimeColumn != null) { 138 eventTimeColumn.accept(v); 139 } 140 if (watermarkExpression != null) { 141 watermarkExpression.acceptChildren(v); 142 } 143 v.postVisit(this); 144 } 145}