001package gudusoft.gsqlparser.demos.sqlguard;
002
003import gudusoft.gsqlparser.EAggregateType;
004import gudusoft.gsqlparser.EDbVendor;
005import gudusoft.gsqlparser.EResolverType;
006import gudusoft.gsqlparser.TCustomSqlStatement;
007import gudusoft.gsqlparser.TGSqlParser;
008import gudusoft.gsqlparser.catalog.diagnostic.CatalogDiagnostic;
009import gudusoft.gsqlparser.catalog.diagnostic.CatalogDiagnosticSeverity;
010import gudusoft.gsqlparser.catalog.input.CatalogInputException;
011import gudusoft.gsqlparser.catalog.input.CatalogInputKind;
012import gudusoft.gsqlparser.catalog.input.CatalogInputSource;
013import gudusoft.gsqlparser.catalog.input.CatalogLoadOptions;
014import gudusoft.gsqlparser.catalog.input.CatalogModelValidator;
015import gudusoft.gsqlparser.catalog.input.CatalogValidationResult;
016import gudusoft.gsqlparser.catalog.input.model.ColumnModel;
017import gudusoft.gsqlparser.catalog.input.model.ConstraintModel;
018import gudusoft.gsqlparser.catalog.input.model.TableModel;
019import gudusoft.gsqlparser.catalog.input.model.UnifiedCatalogModel;
020import gudusoft.gsqlparser.catalog.input.readers.JsonManifestCatalogInputReader;
021import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer;
022import gudusoft.gsqlparser.dlineage.dataflow.model.Option;
023import gudusoft.gsqlparser.dlineage.dataflow.model.xml.column;
024import gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow;
025import gudusoft.gsqlparser.dlineage.dataflow.model.xml.relationship;
026import gudusoft.gsqlparser.dlineage.dataflow.model.xml.sourceColumn;
027import gudusoft.gsqlparser.dlineage.dataflow.model.xml.table;
028import gudusoft.gsqlparser.ESortType;
029import gudusoft.gsqlparser.nodes.TExpression;
030import gudusoft.gsqlparser.nodes.TExpressionList;
031import gudusoft.gsqlparser.nodes.TFunctionCall;
032import gudusoft.gsqlparser.nodes.TGroupBy;
033import gudusoft.gsqlparser.nodes.TGroupByItem;
034import gudusoft.gsqlparser.nodes.TCTE;
035import gudusoft.gsqlparser.nodes.TCTEList;
036import gudusoft.gsqlparser.nodes.TObjectName;
037import gudusoft.gsqlparser.nodes.TOrderBy;
038import gudusoft.gsqlparser.nodes.TOrderByItem;
039import gudusoft.gsqlparser.nodes.TParseTreeVisitor;
040import gudusoft.gsqlparser.nodes.TPartitionClause;
041import gudusoft.gsqlparser.nodes.TResultColumn;
042import gudusoft.gsqlparser.nodes.TResultColumnList;
043import gudusoft.gsqlparser.nodes.TTable;
044import gudusoft.gsqlparser.nodes.TWindowDef;
045import gudusoft.gsqlparser.nodes.TWindowFrame;
046import gudusoft.gsqlparser.nodes.TWindowSpecification;
047import gudusoft.gsqlparser.resolver2.TSQLResolverConfig;
048import gudusoft.gsqlparser.resolver2.binding.BindingDiagnostic;
049import gudusoft.gsqlparser.resolver2.binding.BindingDiagnosticSeverity;
050import gudusoft.gsqlparser.resolver2.binding.BindingResult;
051import gudusoft.gsqlparser.sqlenv.ESQLDataObjectType;
052import gudusoft.gsqlparser.sqlenv.IdentifierService;
053import gudusoft.gsqlparser.stmt.TSelectSqlStatement;
054
055import java.nio.charset.StandardCharsets;
056import java.util.ArrayList;
057import java.util.Collections;
058import java.util.HashSet;
059import java.util.Iterator;
060import java.util.LinkedHashMap;
061import java.util.LinkedHashSet;
062import java.util.List;
063import java.util.Locale;
064import java.util.Map;
065import java.util.Set;
066
067public class SqlGuardService {
068    public static final int MAX_SQL_BYTES = 512 * 1024;
069    public static final int MAX_CATALOG_BYTES = 512 * 1024;
070    private static final int MAX_STATEMENTS = 50;
071
072    public SqlGuardResponse check(SqlGuardRequest req) {
073        if (req == null) {
074            return SqlGuardResponse.error(null, "INVALID_REQUEST", "Invalid request.");
075        }
076        SqlGuardResponse baseErr = validate(req);
077        if (baseErr != null) {
078            return baseErr;
079        }
080
081        SqlGuardResponse r = new SqlGuardResponse();
082        r.requestId = req.requestId;
083        r.facts.dialect = req.dialect;
084        audit(r, req);
085
086        EDbVendor vendor = EDbVendor.fromAlias(req.dialect);
087        UnifiedCatalogModel catalog;
088        if ("inline".equalsIgnoreCase(req.catalogMode)) {
089            // Pass the request vendor to BOTH the reader and the validator
090            // so {@link CatalogModelValidator} can enforce
091            // {@code options.vendor() == model.vendor()}. Without options,
092            // a request with {@code dialect="postgresql"} would happily
093            // pass an Oracle-shaped manifest, then bind columns under PG
094            // identifier folding while the catalog was authored under
095            // Oracle folding — silent governance miss.
096            CatalogLoadOptions opts = CatalogLoadOptions.builder().vendor(vendor).build();
097            try {
098                CatalogInputSource src = CatalogInputSource.fromBytes(
099                        req.catalog.getBytes(StandardCharsets.UTF_8),
100                        CatalogInputKind.JSON_MANIFEST);
101                catalog = new JsonManifestCatalogInputReader().read(src, opts);
102            } catch (CatalogInputException ex) {
103                return SqlGuardResponse.error(req.requestId, "INVALID_CATALOG_JSON",
104                        "Inline catalog JSON could not be parsed: "
105                                + sanitizeParserMessage(ex.getMessage()));
106            }
107            CatalogValidationResult vr = new CatalogModelValidator().validate(catalog, opts);
108            if (!vr.ok()) {
109                return SqlGuardResponse.error(req.requestId, "INVALID_CATALOG_JSON",
110                        "Inline catalog failed validation: "
111                                + firstCatalogErrorMessage(vr));
112            }
113        } else {
114            catalog = SqlGuardCatalogSamples.sample(req.sampleCatalogId);
115        }
116        TGSqlParser parser = new TGSqlParser(vendor);
117        parser.setResolverType(EResolverType.RESOLVER2);
118        parser.setSqlEnv(new SqlGuardSampleEnv(vendor, catalog));
119        TSQLResolverConfig cfg = parser.getResolver2Config();
120        if (cfg == null) {
121            cfg = new TSQLResolverConfig();
122        }
123        cfg.setEmitBindingDiagnostics(true);
124        cfg.setBindingStrictCatalogValidation(true);
125        parser.setResolver2Config(cfg);
126        parser.sqltext = req.sql;
127
128        int rc;
129        try {
130            rc = parser.parse();
131        } catch (Exception e) {
132            return parseError(r, parser);
133        }
134        if (rc != 0 || parser.sqlstatements == null || parser.sqlstatements.size() == 0) {
135            return parseError(r, parser);
136        }
137        if (parser.sqlstatements.size() > MAX_STATEMENTS) {
138            return SqlGuardResponse.error(req.requestId, "TOO_MANY_STATEMENTS", "Too many SQL statements.");
139        }
140        SqlGuardResponse bindingErr = bindingError(parser, r);
141        if (bindingErr != null) {
142            return bindingErr;
143        }
144        SqlGuardResponse semanticErr = validateSemantics(parser, r, catalog);
145        if (semanticErr != null) {
146            return semanticErr;
147        }
148
149        for (int i = 0; i < parser.sqlstatements.size(); i++) {
150            analyzeStatement(parser.sqlstatements.get(i), i, req, r, catalog, vendor);
151        }
152        merge(r);
153        // PRD §11 four-decision model — promote WARN to APPROVAL_REQUIRED
154        // when any catalog-bound column carries the RESTRICTED policy tag
155        // AND is exposed by the SQL (projection / SELECT * / window slot).
156        // DENY-class findings always win; ALLOW stays ALLOW (no
157        // restricted-column access at all).
158        maybeRequireApproval(r);
159        return r;
160    }
161
162    /**
163     * Upgrade {@code r.decision} from "WARN" to "APPROVAL_REQUIRED" when
164     * any {@link gudusoft.gsqlparser.demos.sqlguard.SqlGuardResponse.ColumnFact}
165     * carries a RESTRICTED policy tag and is being exposed by the SQL.
166     * Implements PRD §11 ("approval_required" — the fourth decision in
167     * the live preview's decision model) without introducing a new
168     * severity level at rule-evaluation time; the rules continue to fire
169     * as {@code warn}, and this post-step decides whether the aggregate
170     * decision is operationally "this is a warn the human MUST review"
171     * versus "this is a warn you can clear yourself".
172     *
173     * <p>Why scan {@code facts.columns} instead of {@code violations}:
174     * column facts already carry the resolved catalog tags. A
175     * RESTRICTED-tagged column that triggered SENSITIVE_COLUMN_IN_OUTPUT
176     * is identical at the column-fact layer to one that triggered
177     * SELECT_STAR_ON_SENSITIVE_TABLE — both are "RESTRICTED is in the
178     * output". Detecting at the fact layer means we don't have to teach
179     * every individual rule about approval semantics.
180     */
181    private void maybeRequireApproval(SqlGuardResponse r) {
182        if (!"WARN".equals(r.decision)) {
183            return;
184        }
185        for (SqlGuardResponse.ColumnFact cf : r.facts.columns) {
186            if (cf.policyTags == null || cf.policyTags.isEmpty()) continue;
187            if (cf.exposure == null) continue;
188            // Only output-bearing exposures count — predicate-only access
189            // (filter / join) doesn't surface restricted values to the
190            // caller, so it stays plain WARN.
191            String e = cf.exposure;
192            boolean outputBearing = "explicit_select".equals(e)
193                    || "implicit_select_star".equals(e)
194                    || "window_partition".equals(e)
195                    || "window_order".equals(e);
196            if (!outputBearing) continue;
197            for (String t : cf.policyTags) {
198                if ("RESTRICTED".equalsIgnoreCase(t)) {
199                    r.decision = "APPROVAL_REQUIRED";
200                    r.summary = "Restricted columns are exposed; human approval required before this SQL runs.";
201                    r.suggestions.add("Remove the restricted column(s) from the projection, or route this query through your approval workflow before execution.");
202                    return;
203                }
204            }
205        }
206    }
207
208    private SqlGuardResponse validate(SqlGuardRequest req) {
209        if (req.sql == null || req.sql.trim().length() == 0) {
210            return SqlGuardResponse.error(req.requestId, "INVALID_REQUEST", "SQL is required.");
211        }
212        if (req.sql.getBytes(StandardCharsets.UTF_8).length > MAX_SQL_BYTES) {
213            return SqlGuardResponse.error(req.requestId, "SQL_TOO_LARGE", "SQL exceeds 512 KB limit.");
214        }
215        if (req.catalog != null && req.catalog.getBytes(StandardCharsets.UTF_8).length > MAX_CATALOG_BYTES) {
216            return SqlGuardResponse.error(req.requestId, "CATALOG_TOO_LARGE", "Catalog exceeds 512 KB limit.");
217        }
218        EDbVendor requestedVendor = EDbVendor.fromAlias(req.dialect);
219        if (requestedVendor == null || !requestedVendor.isImplemented()) {
220            return SqlGuardResponse.error(req.requestId, "UNSUPPORTED_DIALECT", "Unsupported SQL dialect.");
221        }
222        if ("sample".equalsIgnoreCase(req.catalogMode)) {
223            if (!SqlGuardCatalogSamples.isSupportedSample(req.sampleCatalogId)) {
224                return SqlGuardResponse.error(req.requestId, "UNSUPPORTED_SAMPLE_CATALOG", "Unsupported sample catalog.");
225            }
226        } else if ("inline".equalsIgnoreCase(req.catalogMode)) {
227            if (req.catalog == null || req.catalog.trim().length() == 0) {
228                return SqlGuardResponse.error(req.requestId, "INVALID_REQUEST",
229                        "Inline catalog is required when catalogMode=inline.");
230            }
231        } else {
232            return SqlGuardResponse.error(req.requestId, "UNSUPPORTED_CATALOG_MODE", "Unsupported catalog mode.");
233        }
234        if (!"all".equalsIgnoreCase(req.policyPreset)
235                && !"pii-basic".equalsIgnoreCase(req.policyPreset)
236                && !"destructive-sql-block".equalsIgnoreCase(req.policyPreset)) {
237            return SqlGuardResponse.error(req.requestId, "UNSUPPORTED_POLICY_PRESET", "Unsupported policy preset.");
238        }
239        return null;
240    }
241
242    private SqlGuardResponse parseError(SqlGuardResponse r, TGSqlParser parser) {
243        r.ok = false;
244        r.error = new SqlGuardResponse.ErrorInfo();
245        r.error.code = "SQL_PARSE_ERROR";
246        String details = "";
247        try {
248            details = sanitizeParserMessage(parser == null ? null : parser.getErrormessage());
249        } catch (Exception ignored) {
250            details = "";
251        }
252        r.error.message = details.length() == 0
253                ? "SQL syntax error: SQL could not be parsed."
254                : "SQL syntax error: " + details;
255        return r;
256    }
257
258    private SqlGuardResponse validateSemantics(TGSqlParser parser, SqlGuardResponse r, UnifiedCatalogModel cat) {
259        for (int i = 0; i < parser.sqlstatements.size(); i++) {
260            TCustomSqlStatement stmt = parser.sqlstatements.get(i);
261            if (stmt instanceof TSelectSqlStatement) {
262                SqlGuardResponse err = validateSelectSemantics((TSelectSqlStatement) stmt, r, cat);
263                if (err != null) {
264                    return err;
265                }
266            }
267        }
268        return null;
269    }
270
271    private SqlGuardResponse validateSelectSemantics(TSelectSqlStatement sel, SqlGuardResponse r, UnifiedCatalogModel cat) {
272        TGroupBy groupBy = sel.getGroupByClause();
273        if (groupBy == null || groupBy.isAllModifier() || groupBy.getItems() == null || groupBy.getItems().size() == 0) {
274            return null;
275        }
276        LinkedHashSet<String> grouped = new LinkedHashSet<String>();
277        for (int i = 0; i < groupBy.getItems().size(); i++) {
278            TGroupByItem item = groupBy.getItems().getGroupByItem(i);
279            if (item != null && item.getExpr() != null) {
280                grouped.add(normalizeSemanticExpression(item.getExpr().toString()));
281            }
282        }
283        TResultColumnList cols = sel.getResultColumnList();
284        if (cols == null) {
285            return null;
286        }
287        for (int i = 0; i < cols.size(); i++) {
288            TResultColumn rc = cols.getResultColumn(i);
289            String expr = resultExpressionText(rc);
290            if (expr.length() == 0 || "*".equals(expr) || expr.endsWith(".*") || isAggregateColumn(rc)) {
291                continue;
292            }
293            if (!grouped.contains(normalizeSemanticExpression(expr))) {
294                return semanticError(r, "Column/expression " + expr
295                        + " appears in SELECT but is not aggregated and is not listed in GROUP BY.");
296            }
297        }
298        return null;
299    }
300
301    /**
302     * Convert the first {@link BindingDiagnosticSeverity#ERROR}-level binding
303     * diagnostic produced by resolver2 into a {@code SQL_SEMANTIC_ERROR}
304     * envelope. Returns {@code null} when the SQL binds cleanly so the caller
305     * proceeds to policy / lineage analysis. Replaces the regex-based
306     * qualified-column scanner retired in S18 (plan §7.3 S18).
307     */
308    private SqlGuardResponse bindingError(TGSqlParser parser, SqlGuardResponse r) {
309        BindingResult br = parser.getBindingResult();
310        if (br == null || !br.hasErrors()) {
311            return null;
312        }
313        for (BindingDiagnostic d : br.getDiagnostics()) {
314            if (d.severity() == BindingDiagnosticSeverity.ERROR) {
315                return semanticError(r, d.message());
316            }
317        }
318        return null;
319    }
320
321    /**
322     * Shared with {@link #aggregateFunctionsFromAst} so the GROUP BY
323     * semantic check and the lineage code path agree on what counts as
324     * an aggregate. Both routes walk the same AST visitor — there is no
325     * lexical / textual path that could desync them.
326     */
327    private boolean isAggregateColumn(TResultColumn rc) {
328        List<String> fns = aggregateFunctionsFromAst(rc);
329        return fns != null && !fns.isEmpty();
330    }
331
332    private String normalizeSemanticExpression(String expr) {
333        if (expr == null) {
334            return "";
335        }
336        String s = expr.toLowerCase(Locale.ROOT).replace("\"", "").replace("`", "").trim();
337        s = s.replaceAll("\\s+", "");
338        return s;
339    }
340
341    private SqlGuardResponse semanticError(SqlGuardResponse r, String message) {
342        r.ok = false;
343        r.error = new SqlGuardResponse.ErrorInfo();
344        r.error.code = "SQL_SEMANTIC_ERROR";
345        r.error.message = "SQL semantic error: " + message;
346        return r;
347    }
348
349    /**
350     * Scrubs an error message for envelope use: collapses runs of
351     * whitespace and truncates at 500 chars. Every C0 control
352     * character ({@code 0x00..0x1F}) and DEL ({@code 0x7F}) becomes a
353     * space — not just CR/LF — because
354     * {@code gudusoft.gsqlparser.util.json.JSONSerializer} only
355     * escapes the named-escape subset
356     * ({@code "}, {@code \\}, {@code /}, {@code \b}, {@code \f},
357     * {@code \n}, {@code \r}, {@code \t}). Unescaped controls
358     * survive into the wire envelope and can poison BFF
359     * console / syslog rendering (terminal escape sequences, line
360     * separators that fool log parsers, NUL byte truncation in C-based
361     * log shippers). C1 controls (0x80–0x9F) are intentionally
362     * preserved because they are valid Unicode characters in any
363     * UTF-8 caller string and the JSON serializer round-trips them
364     * correctly; stripping them would mutate legitimate caller data.
365     */
366    private String sanitizeParserMessage(String msg) {
367        if (msg == null) {
368            return "";
369        }
370        StringBuilder sb = new StringBuilder(msg.length());
371        for (int i = 0; i < msg.length(); i++) {
372            char c = msg.charAt(i);
373            sb.append(c < 0x20 || c == 0x7F ? ' ' : c);
374        }
375        String s = sb.toString().trim();
376        while (s.indexOf("  ") >= 0) {
377            s = s.replace("  ", " ");
378        }
379        if (s.length() > 500) {
380            s = s.substring(0, 500) + "...";
381        }
382        return s;
383    }
384
385    private void audit(SqlGuardResponse r, SqlGuardRequest req) {
386        r.audit.put("catalogMode", req.catalogMode);
387        // Under inline mode the request-default ("ecommerce") for
388        // sampleCatalogId is unused; record the literal sentinel
389        // {@code "inline"} so (a) the wire shape is stable — the JSON
390        // serializer drops null map values, which would silently
391        // delete the key — and (b) the audit log doesn't claim a
392        // sample catalog drove the decision. Pinned by
393        // {@code inlineModeAuditWireFormatLocksInSentinel}.
394        r.audit.put("sampleCatalogId",
395                "inline".equalsIgnoreCase(req.catalogMode) ? "inline" : req.sampleCatalogId);
396        r.audit.put("policyPreset", req.policyPreset);
397        r.audit.put("engine", "gsp-java");
398        r.audit.put("engineVersion", "local");
399    }
400
401    /**
402     * Picks the first {@link CatalogDiagnosticSeverity#ERROR} message from
403     * a failed {@link CatalogValidationResult}. The validator returns
404     * every problem it found, but the worker only surfaces one in the
405     * envelope so the BFF gets a single actionable line — same shape as
406     * {@link #bindingError} picking the first resolver2 ERROR.
407     */
408    private String firstCatalogErrorMessage(CatalogValidationResult vr) {
409        for (CatalogDiagnostic d : vr.diagnostics()) {
410            if (d.severity() == CatalogDiagnosticSeverity.ERROR) {
411                return sanitizeParserMessage(d.message());
412            }
413        }
414        return "unknown validation error";
415    }
416
417    private void analyzeStatement(
418            TCustomSqlStatement stmt,
419            int index,
420            SqlGuardRequest req,
421            SqlGuardResponse r,
422            UnifiedCatalogModel cat,
423            EDbVendor vendor) {
424        SqlGuardResponse.StatementResult sr = new SqlGuardResponse.StatementResult();
425        sr.index = index;
426        sr.sqlType = sqlType(stmt);
427        r.statements.add(sr);
428
429        FromScope scope = addTables(stmt, r, cat, vendor);
430        int lineageBefore = r.facts.lineage.size();
431        if (stmt instanceof TSelectSqlStatement) {
432            analyzeSelect((TSelectSqlStatement) stmt, index, req, r, cat, scope, vendor);
433        } else {
434            addViolation(r, index, "DML_OR_DDL_BLOCKED", "error", "Only SELECT statements are allowed in this demo.", sr.sqlType);
435        }
436        sr.lineageCount = r.facts.lineage.size() - lineageBefore;
437        applyStatementDecision(sr, r.violations, index);
438    }
439
440    /**
441     * FROM-clause snapshot — list of resolved table names (lowercased,
442     * catalog-canonical). Used by {@link #analyzeSelect} to pick a
443     * default table for {@code SELECT *} expansion. The alias map that
444     * used to live here is gone: dlineage now resolves alias →
445     * source-table during lineage extraction, so sqlguard no longer
446     * maintains its own alias index.
447     */
448    private static final class FromScope {
449        final List<String> tableNames = new ArrayList<String>();
450    }
451
452    private String sqlType(TCustomSqlStatement s) {
453        String n = s.getClass().getSimpleName().toUpperCase(Locale.ROOT);
454        if (n.contains("SELECT")) return "SELECT";
455        if (n.contains("DELETE")) return "DELETE";
456        if (n.contains("UPDATE")) return "UPDATE";
457        if (n.contains("INSERT")) return "INSERT";
458        if (n.contains("CREATE") || n.contains("DROP") || n.contains("ALTER")) return "DDL";
459        return n;
460    }
461
462    private FromScope addTables(TCustomSqlStatement stmt, SqlGuardResponse r,
463                                 UnifiedCatalogModel cat, EDbVendor vendor) {
464        FromScope scope = new FromScope();
465        if (stmt.tables == null) {
466            return scope;
467        }
468        for (int i = 0; i < stmt.tables.size(); i++) {
469            TTable t = stmt.tables.getTable(i);
470            String raw = t.toString();
471            // Wire-form short name (no schema qualifier) for facts.tables
472            // + scope. Vendor-aware normalization replaces the
473            // unconditional .toLowerCase(Locale.ROOT) that was only
474            // correct for PG-default-unquoted-ASCII. See
475            // sqlguard/CLAUDE.md "Identifier comparison rule".
476            String tableName = IdentifierService.normalizeStatic(
477                    vendor, ESQLDataObjectType.dotTable, simple(raw));
478            if (tableName == null || tableName.length() == 0) {
479                continue;
480            }
481            scope.tableNames.add(tableName);
482            // Catalog lookup passes the schema-qualified form so
483            // findTable resolves the right schema when present, instead
484            // of guessing. Catalog comparison uses the model's vendor
485            // (catalog-stored canonical form), not the request vendor
486            // (SQL parsing). See SqlGuardCatalogSamples "Vendor convention".
487            boolean matched = SqlGuardCatalogSamples.findTable(
488                    cat, qualifiedTableRef(raw)) != null;
489            // findTable uses model.vendor() internally; see
490            // SqlGuardCatalogSamples class Javadoc "Vendor convention".
491            addTableFact(r, tableName, matched, vendor);
492        }
493        return scope;
494    }
495
496    private void addTableFact(SqlGuardResponse r, String n, boolean matched, EDbVendor vendor) {
497        for (SqlGuardResponse.TableFact t : r.facts.tables) {
498            if (IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotTable, t.name, n)) {
499                return;
500            }
501        }
502        SqlGuardResponse.TableFact tf = new SqlGuardResponse.TableFact();
503        tf.name = n;
504        tf.schema = "public";
505        tf.catalogMatched = matched;
506        r.facts.tables.add(tf);
507        if (!matched) {
508            r.facts.diagnostics.add("MISSING_TABLE:" + n);
509        }
510    }
511
512    /**
513     * Per-statement lineage extraction. Delegates to the GSP {@code dlineage}
514     * package (DataFlowAnalyzer) for the column-level relationship graph,
515     * then layers sqlguard-specific processing on top: catalog binding,
516     * policy tags, exposure classification, and DISTINCT-aware
517     * {@code aggregateFunctions} (which dlineage does not surface).
518     *
519     * <p>See {@code sqlguard/CLAUDE.md} "Lineage extraction rule
520     * (mandatory)": sqlguard MUST consume dlineage's in-memory model and
521     * MUST NOT reimplement column-to-source-column tracing. This method
522     * was rewritten to comply (replaces the legacy AST-visitor
523     * {@code sourceColumnsFromAst} plus the diagnostic-only
524     * {@code captureDlineageSignal} which threw away dlineage's output).
525     */
526    private void analyzeSelect(
527            TSelectSqlStatement sel,
528            int index,
529            SqlGuardRequest req,
530            SqlGuardResponse r,
531            UnifiedCatalogModel cat,
532            FromScope scope,
533            EDbVendor vendor) {
534        TResultColumnList astCols = sel.getResultColumnList();
535        if (astCols == null) {
536            return;
537        }
538
539        // Structural facts — pure-AST observations the BFF / frontend
540        // surfaces to the reader ("what does this query do"). Independent
541        // of dlineage and rule evaluation; called once per top-level
542        // analyzeSelect entry. Blueprint §S6 (CTEs), §S12 (ORDER BY),
543        // §S14/§S15 (set operations), §S16 (HAVING).
544        collectStructuralFacts(sel, index, r, vendor);
545
546        // S18 — SELECT_STAR_WITHOUT_LIMIT. Purely AST-driven (no dlineage
547        // needed) so detect up-front: any `*` or `x.*` projection paired
548        // with an absent {@link TSelectSqlStatement#getLimitClause()}
549        // earns one warning per statement, regardless of how many star
550        // refs the projection contains. Fires independently of
551        // {@code SELECT_STAR_ON_SENSITIVE_TABLE} per blueprint §S18 — a
552        // single SELECT * SQL can produce both findings. Fires under
553        // every preset (matches the existing
554        // {@code SELECT_STAR_ON_SENSITIVE_TABLE} convention).
555        checkSelectStarWithoutLimit(sel, index, r, astCols);
556
557        // S20 — UNQUALIFIED_COLUMN. Schema-stability hygiene; gated on
558        // {@code policyPreset="all"} so the narrow guardrail presets
559        // ({@code pii-basic} / {@code destructive-sql-block}) stay
560        // single-purpose and existing requests against them are
561        // byte-identical. Genuine ambiguity is already caught upstream
562        // by {@link #bindingError} as {@code SQL_SEMANTIC_ERROR}.
563        if ("all".equalsIgnoreCase(req.policyPreset)) {
564            checkUnqualifiedColumns(sel, index, r, vendor);
565        }
566
567        // S25 — UNSUPPORTED_DIALECT_FUNCTION. AST-only walk over
568        // TFunctionCall against the L5 blacklist
569        // ({@link DialectFunctionDivergence}). Runs under every preset
570        // — this is a safety floor (denying SQL that can't possibly
571        // execute), not a hygiene preference. Independent of dlineage:
572        // a function call in WHERE that doesn't participate in
573        // projection lineage (e.g. WHERE DATE_TRUNC(...) > x) still
574        // must fire.
575        checkUnsupportedDialectFunction(sel, index, r, vendor);
576
577        dataflow df = runDlineage(sel, vendor, r, index);
578
579        // S22 — tenant-isolation policy gate. Fires before the
580        // projection-tracing loop so the violation is reported even if
581        // the projection branch later early-returns (e.g. when dlineage
582        // produces an empty top resultset). Internally guards on
583        // {@code userContext.tenantId} so existing tests that don't
584        // declare a tenant context stay unaffected.
585        checkTenantFilter(index, req, r, cat, df, vendor);
586
587        // S24 — wrong join path. Walks dlineage joinCondition source
588        // pairs against catalog FK metadata. Independent of tenant
589        // filter (no shared state) but uses the same dlineage pass.
590        // Always populates {@code facts.joins[]} so downstream consumers
591        // can see the FK-match decision even when the rule is silent;
592        // fires only when an FK exists between the joined tables but
593        // the predicate doesn't align with it.
594        checkWrongJoinPath(index, r, cat, df, vendor);
595
596        table topRs = topResultset(df);
597        if (df == null || topRs == null || topRs.getColumns() == null) {
598            // Dlineage unavailable or empty. Fall back to AST-only star
599            // expansion for SELECT *, but emit no lineage edges otherwise.
600            // The diagnostic was already added by runDlineage().
601            String defaultTable = scope.tableNames.isEmpty() ? null : scope.tableNames.get(0);
602            for (int i = 0; i < astCols.size(); i++) {
603                String text = resultExpressionText(astCols.getResultColumn(i));
604                if ("*".equals(text) || text.endsWith(".*")) {
605                    expandStar(index, r, cat, defaultTable, vendor);
606                }
607            }
608            return;
609        }
610
611        // Index resultset IDs so the leaf-tracer can tell intermediate
612        // function/CAST resultsets from real catalog tables.
613        Set<String> resultsetIds = new HashSet<String>();
614        for (table rs : df.getResultsets()) {
615            if (rs.getId() != null) resultsetIds.add(rs.getId());
616        }
617
618        String defaultTable = scope.tableNames.isEmpty() ? null : scope.tableNames.get(0);
619        int astIdx = 0;
620        for (column rsCol : topRs.getColumns()) {
621            if (isSystemColumn(rsCol)) {
622                continue;
623            }
624            TResultColumn rc = astIdx < astCols.size() ? astCols.getResultColumn(astIdx) : null;
625            astIdx++;
626            String text = rc == null ? "" : resultExpressionText(rc);
627            if ("*".equals(text) || text.endsWith(".*")) {
628                expandStar(index, r, cat, defaultTable, vendor);
629                continue;
630            }
631
632            // Result-column name is a column-class identifier. Sqlguard's
633            // virtual "result." namespace is sqlguard's own convention,
634            // but the projected column name itself follows the dialect's
635            // identifier rules.
636            String target = "result." + (rsCol.getName() == null ? ""
637                    : IdentifierService.normalizeStatic(vendor, ESQLDataObjectType.dotColumn, rsCol.getName()));
638
639            // Tier 1 — lineage edges come from dlineage. Walk the
640            // relationship graph from the result column to its leaf
641            // sources, recursing through intermediate (function / CAST)
642            // resultsets.
643            List<sourceColumn> leaves = new ArrayList<sourceColumn>();
644            boolean[] traversedResultset = new boolean[] { false };
645            Set<String> visited = new HashSet<String>();
646            traceLeaves(df, resultsetIds, topRs.getId(), rsCol.getName(),
647                    visited, leaves, traversedResultset, vendor);
648
649            // Tier 3a — AST-side window detection (blueprint §1.5.B B1/B2).
650            // Captured up-front so we can both decide whether to walk
651            // dlineage's window-extra fdr edges below AND store the field
652            // on the lineage edge for downstream consumers.
653            List<SqlGuardResponse.WindowFn> winFns = windowFunctionsFromAst(rc);
654
655            // Tier 1b — when a window aggregate participates in the
656            // projection, PARTITION BY / ORDER BY columns also drive the
657            // result and belong in sourceColumns. dlineage surfaces them
658            // as {@code fdr} edges with {@code clauseType="selectList"}
659            // (partition) and {@code "orderby"} (order) on the function
660            // resultset; the regular {@link #traceLeaves} pass skips them
661            // because they're not {@code fdd} data-dependency edges.
662            //
663            // Partition and order columns go into SEPARATE buckets so the
664            // catalog-binding pass can label exposure correctly per role
665            // ({@code window_partition} vs {@code window_order}). Folding
666            // both into the main leaves list — as the original B1/B2
667            // implementation did — mis-attributed partition-only PII as
668            // {@code explicit_select} exposure with a misleading
669            // {@code SENSITIVE_COLUMN_IN_OUTPUT} violation. The fix splits
670            // the buckets and dispatches the right sensitive rule per
671            // exposure (blueprint §B5: {@code SENSITIVE_COLUMN_IN_WINDOW_PARTITION}).
672            List<sourceColumn> partitionExtras = new ArrayList<sourceColumn>();
673            List<sourceColumn> orderExtras = new ArrayList<sourceColumn>();
674            if (winFns != null && !winFns.isEmpty()) {
675                Set<String> windowVisited = new HashSet<String>();
676                traceWindowExtras(df, resultsetIds, topRs.getId(), rsCol.getName(),
677                        windowVisited, partitionExtras, orderExtras, vendor);
678            }
679
680            // Tier 2 — sqlguard's catalog binding: translate dlineage's
681            // (parent_name, column) into the fully-qualified
682            // "schema.table.column" that the wire contract uses, dropping
683            // any leaf that doesn't bind to a known catalog column.
684            // dlineage's parent_name preserves the user-written casing
685            // (e.g. "sbTransaction"); IdentifierService folds it per the
686            // dialect's rules so catalog lookups succeed.
687            //
688            // sourceColumns (wire format) is the union of all three roles
689            // in document order: function-arg leaves first (matching the
690            // pre-B1 contract), then partition extras, then order extras.
691            LinkedHashSet<String> functionArgSources = bindToCatalog(leaves, cat, vendor);
692            LinkedHashSet<String> partitionSources = bindToCatalog(partitionExtras, cat, vendor);
693            LinkedHashSet<String> orderSources = bindToCatalog(orderExtras, cat, vendor);
694
695            // Role precedence: a column appearing in BOTH function-arg
696            // (dlineage fdd) AND partition/order (dlineage fdr) is
697            // legitimate when the AST shows the column is a real projection
698            // dependency — either a real function argument like {@code
699            // LAG(email) OVER (PARTITION BY email)} (LAG passes email
700            // through to output AND partitions by it) or a scalar-operator
701            // operand like {@code email || COUNT(*) OVER (PARTITION BY
702            // email)} (the bare {@code email} ref is a real projection
703            // source). It is dlineage over-attribution when the AST does
704            // NOT reference the column outside the window spec (e.g.
705            // {@code COUNT(*) OVER (PARTITION BY email)} — dlineage's
706            // fdd edge includes email as a function "dependency" because
707            // of the {@code *} glob, even though email isn't really a
708            // projection source).
709            //
710            // The AST collector ({@link #collectAstProjectionDependencies})
711            // gathers catalog-canonical references to columns OUTSIDE any
712            // window spec, then we strip from functionArgSources only
713            // those overlap columns that are NOT in that set. Without
714            // this gate the COUNT(*) shape would incorrectly fire
715            // {@code SENSITIVE_COLUMN_IN_OUTPUT} on a partition-only PII
716            // column.
717            if (!partitionSources.isEmpty() || !orderSources.isEmpty()) {
718                Set<String> astProjectionDeps =
719                        collectAstProjectionDependencies(rc, cat, vendor);
720                stripOverAttributedSources(functionArgSources,
721                        partitionSources, orderSources, astProjectionDeps);
722            }
723
724            // sourceColumns (wire format) is the union of all three roles
725            // in document order: function-arg first (matching the pre-B1
726            // contract), then partition, then order.
727            LinkedHashSet<String> sourceColumns = new LinkedHashSet<String>();
728            sourceColumns.addAll(functionArgSources);
729            sourceColumns.addAll(partitionSources);
730            sourceColumns.addAll(orderSources);
731            List<String> srcList = new ArrayList<String>(sourceColumns);
732
733            // Tier 3 — sqlguard-specific value-add. aggregateFunctions
734            // (with DISTINCT modifier) is NOT surfaced by dlineage; the
735            // AST visitor stays as the authoritative source for this
736            // field. See sqlguard/CLAUDE.md Case 1 escalation about the
737            // dlineage gap.
738            List<String> aggFns = aggregateFunctionsFromAst(rc);
739
740            // lineageType: window beats aggregate beats direct/expression.
741            // A projection that mixes scalar + window calls is rare but
742            // legal — we still classify it as window because the window
743            // semantics (frame, partitioning) are the dominant signal.
744            String ltype;
745            if (winFns != null && !winFns.isEmpty()) {
746                ltype = "window";
747            } else if (aggFns != null && !aggFns.isEmpty()) {
748                ltype = "aggregate";
749            } else if (!traversedResultset[0] && leaves.size() == 1
750                    && nameEquals(vendor, rsCol.getName(), leaves.get(0).getColumn())) {
751                ltype = "direct";
752            } else {
753                ltype = "expression";
754            }
755
756            // Per-role column-fact emission. Each call dispatches the
757            // right sensitive-rule via addColumnFact's exposure switch
758            // — a column that appears in both projection AND partition
759            // produces two column facts (one per role) and two distinct
760            // violations, mirroring the dual-exposure reality.
761            for (String sc : functionArgSources) {
762                addColumnFactForSource(r, sc, "explicit_select", cat, vendor);
763            }
764            for (String sc : partitionSources) {
765                addColumnFactForSource(r, sc, "window_partition", cat, vendor);
766            }
767            for (String sc : orderSources) {
768                addColumnFactForSource(r, sc, "window_order", cat, vendor);
769            }
770            addLineage(r, index, target, srcList, ltype, aggFns, winFns);
771        }
772
773        // S23 — AGGREGATION_GRAIN_MISMATCH. Runs last so every aggregate
774        // lineage edge for this statement is already in
775        // {@code r.facts.lineage}. Self-silences when the request carries
776        // no {@code question}, so existing callers stay byte-identical.
777        checkAggregationGrainMismatch(index, req, r);
778    }
779
780    /**
781     * Walk one statement's AST and append four kinds of structural fact
782     * to the response: CTE names, top-level ORDER BY items, HAVING clauses,
783     * and set-operation operators. Pure-AST observation — no dlineage, no
784     * resolver state, no catalog. Each fact carries {@code statementIndex}
785     * so multi-statement requests stay disambiguated.
786     *
787     * <p>For combined queries ({@code A UNION B INTERSECT C}) the entry
788     * {@code sel} is the root combined node. CTEs and the top-level ORDER
789     * BY live there; HAVING and per-leaf set operators live deeper. The
790     * walker recurses through {@link TSelectSqlStatement#getLeftStmt()} /
791     * {@link TSelectSqlStatement#getRightStmt()} and processes each leaf.
792     *
793     * <p>Blueprint refs: §S6 (CTEs), §S12 (ORDER BY), §S14/§S15 (set
794     * operations), §S16 (HAVING).
795     */
796    private void collectStructuralFacts(TSelectSqlStatement sel, int index,
797                                         SqlGuardResponse r, EDbVendor vendor) {
798        if (sel == null) {
799            return;
800        }
801        // CTEs are on the outermost node; safe to read at the root only.
802        collectCtes(sel, r);
803        // Top-level ORDER BY applies to the whole result; read at the root.
804        collectOrderBy(sel, index, r);
805        // HAVING + set operators may be nested inside combined queries.
806        // Vendor threads through so EXCEPT-vs-MINUS DISTINCT disambiguates
807        // by dialect (the parser's int constants collide at value 11).
808        collectHavingAndSetOps(sel, index, r, vendor);
809    }
810
811    private void collectCtes(TSelectSqlStatement sel, SqlGuardResponse r) {
812        TCTEList cteList = sel.getCteList();
813        if (cteList == null) return;
814        for (int i = 0; i < cteList.size(); i++) {
815            TCTE cte = cteList.getCTE(i);
816            if (cte == null) continue;
817            TObjectName name = cte.getTableName();
818            if (name == null) continue;
819            String s = name.toString();
820            if (s == null || s.isEmpty()) continue;
821            r.facts.ctes.add(s.toLowerCase(Locale.ROOT));
822        }
823    }
824
825    private void collectOrderBy(TSelectSqlStatement sel, int index, SqlGuardResponse r) {
826        TOrderBy ob = sel.getOrderbyClause();
827        if (ob == null || ob.getItems() == null) return;
828        for (int i = 0; i < ob.getItems().size(); i++) {
829            TOrderByItem item = ob.getItems().getOrderByItem(i);
830            if (item == null || item.getSortKey() == null) continue;
831            SqlGuardResponse.OrderByFact fact = new SqlGuardResponse.OrderByFact();
832            fact.statementIndex = index;
833            fact.column = item.getSortKey().toString().toLowerCase(Locale.ROOT);
834            ESortType sort = item.getSortOrder();
835            if (sort == ESortType.asc) {
836                fact.direction = "ASC";
837            } else if (sort == ESortType.desc) {
838                fact.direction = "DESC";
839            } else {
840                fact.direction = "NONE";
841            }
842            r.facts.orderBy.add(fact);
843        }
844    }
845
846    private void collectHavingAndSetOps(TSelectSqlStatement sel, int index,
847                                         SqlGuardResponse r, EDbVendor vendor) {
848        if (sel == null) return;
849        if (sel.isCombinedQuery()) {
850            // GSP represents `A op1 B op2 C` as a LEFT-recursive tree: the
851            // ROOT carries op2, and op1 lives on the left subtree. To emit
852            // operators in TEXTUAL order, recurse left FIRST, then add the
853            // current node's op, then recurse right. The earlier order
854            // (emit, left, right) reported `[op2, op1]` for chained queries,
855            // which misled consumers reading the array as the operator
856            // sequence the SQL wrote. Codex review §P2.
857            collectHavingAndSetOps(sel.getLeftStmt(), index, r, vendor);
858            String op = setOperatorName(sel.getSetOperator(), vendor);
859            if (op != null) {
860                SqlGuardResponse.SetOperationFact f = new SqlGuardResponse.SetOperationFact();
861                f.statementIndex = index;
862                f.op = op;
863                r.facts.setOperations.add(f);
864            }
865            collectHavingAndSetOps(sel.getRightStmt(), index, r, vendor);
866            return;
867        }
868        // Non-combined leaf: capture HAVING if present. HAVING hangs off
869        // the GROUP BY clause in GSP's AST (matching the SQL grammar
870        // requirement that HAVING requires GROUP BY); a present HAVING
871        // without GROUP BY is impossible from a valid parse tree.
872        // TGroupBy.getHavingClause() returns the TExpression directly —
873        // the {@link THavingClause} node type is internal to the parser
874        // and is not exposed at the GROUP BY level.
875        TGroupBy gb = sel.getGroupByClause();
876        TExpression having = gb != null ? gb.getHavingClause() : null;
877        if (having != null) {
878            String text = having.toString();
879            if (text != null && !text.trim().isEmpty()) {
880                SqlGuardResponse.AggregationFact a = new SqlGuardResponse.AggregationFact();
881                a.statementIndex = index;
882                a.having = text.trim().toLowerCase(Locale.ROOT);
883                r.facts.aggregations.add(a);
884            }
885        }
886    }
887
888    /**
889     * Map a {@code TSelectSqlStatement.setOperator_*} integer constant to a
890     * canonical operator name. {@code vendor} disambiguates the {@code = 11}
891     * collision between {@code SET_OPERATOR_EXCEPTDISTINCT} and
892     * {@code SET_OPERATOR_MINUSDISTINCT}: in vendors that don't grammar-support
893     * EXCEPT (Oracle, MySQL), value 11 must be MINUS DISTINCT; everywhere else
894     * the SQL-standard EXCEPT DISTINCT spelling wins. Codex review §P2.
895     */
896    private static String setOperatorName(int code, EDbVendor vendor) {
897        switch (code) {
898            case TSelectSqlStatement.setOperator_union:        return "UNION";
899            case TSelectSqlStatement.setOperator_unionall:     return "UNION ALL";
900            case TSelectSqlStatement.SET_OPERATOR_UNIONDISTINCT: return "UNION DISTINCT";
901            case TSelectSqlStatement.setOperator_intersect:    return "INTERSECT";
902            case TSelectSqlStatement.setOperator_intersectall: return "INTERSECT ALL";
903            case TSelectSqlStatement.SET_OPERATOR_INTERSECTDISTINCT: return "INTERSECT DISTINCT";
904            case TSelectSqlStatement.setOperator_minus:        return "MINUS";
905            case TSelectSqlStatement.setOperator_minusall:     return "MINUS ALL";
906            case TSelectSqlStatement.setOperator_except:       return "EXCEPT";
907            case TSelectSqlStatement.setOperator_exceptall:    return "EXCEPT ALL";
908            case TSelectSqlStatement.SET_OPERATOR_MINUSDISTINCT:
909                // Also matches SET_OPERATOR_EXCEPTDISTINCT — both = 11.
910                // Vendor decides the spelling: Oracle/MySQL grammars use
911                // MINUS, everyone else uses EXCEPT (SQL standard).
912                return isMinusDialect(vendor) ? "MINUS DISTINCT" : "EXCEPT DISTINCT";
913            default: return null;
914        }
915    }
916
917    private static boolean isMinusDialect(EDbVendor vendor) {
918        return vendor == EDbVendor.dbvoracle || vendor == EDbVendor.dbvmysql;
919    }
920
921    /**
922     * Blueprint §S18 — emit {@code SELECT_STAR_WITHOUT_LIMIT} when the
923     * statement projects a star ({@code SELECT *} or {@code t.*}) and
924     * the SELECT has no {@code LIMIT} clause. Independent of
925     * {@code SELECT_STAR_ON_SENSITIVE_TABLE}; both can fire on the same
926     * SQL (the blueprint's gold S18 SQL produces exactly that pair).
927     *
928     * <p>One violation per statement regardless of how many star refs.
929     * Two stars in the same projection are still one unbounded result.
930     */
931    private void checkSelectStarWithoutLimit(TSelectSqlStatement sel, int index,
932                                              SqlGuardResponse r, TResultColumnList astCols) {
933        if (sel.getLimitClause() != null) {
934            return;
935        }
936        for (int i = 0; i < astCols.size(); i++) {
937            String text = resultExpressionText(astCols.getResultColumn(i));
938            if ("*".equals(text) || text.endsWith(".*")) {
939                addViolation(r, index, "SELECT_STAR_WITHOUT_LIMIT", "warn",
940                        "Unbounded SELECT *; add LIMIT or restrict columns.",
941                        "SELECT * without LIMIT clause");
942                return;
943            }
944        }
945    }
946
947    /**
948     * Blueprint §S20 — emit {@code UNQUALIFIED_COLUMN} for column
949     * references that resolved cleanly but lack a table qualifier in
950     * the SQL ({@code SELECT col FROM t1 JOIN t2 …} where only one of
951     * {@code t1} / {@code t2} contains {@code col}). The query works
952     * today; the warning is the schema-stability signal — if a future
953     * schema change adds {@code col} to a second in-scope table, the
954     * resolver flips to {@code AMBIGUOUS_COLUMN} (which sqlguard's
955     * {@link #bindingError} surfaces as {@code SQL_SEMANTIC_ERROR}
956     * upstream of this method).
957     *
958     * <p>Genuinely-ambiguous references therefore never reach this
959     * method — they short-circuit in {@code bindingError()}. The
960     * filter chain ({@code dotColumn} object type, empty
961     * {@link TObjectName#getTableString()}, non-null
962     * {@link TObjectName#getSourceTable()}, {@code !isAmbiguous()})
963     * picks off exactly the S20 case.
964     *
965     * <p>Dedup uses the {@code TObjectName.toString()} text so the same
966     * column referenced twice in one SELECT produces one finding. The
967     * resolved-table comparison is identifier-aware via
968     * {@link IdentifierService}.
969     */
970    private void checkUnqualifiedColumns(TSelectSqlStatement sel, int index,
971                                          SqlGuardResponse r, EDbVendor vendor) {
972        UnqualifiedColumnCollector v = new UnqualifiedColumnCollector(vendor);
973        sel.acceptChildren(v);
974        for (Map.Entry<String, String> e : v.findings.entrySet()) {
975            addViolation(r, index, "UNQUALIFIED_COLUMN", "warn",
976                    "Unqualified column resolved to " + e.getValue()
977                            + "; qualify it for stability across schema changes.",
978                    e.getKey() + " (resolves to " + e.getValue() + ")");
979        }
980    }
981
982    /**
983     * AST visitor for {@link #checkUnqualifiedColumns}. Collects each
984     * column reference whose SQL text has no table qualifier but whose
985     * resolved {@link TObjectName#getSourceTable()} is non-null and
986     * unambiguous. {@code findings} keys on the column's textual form
987     * so a repeated reference (e.g. in SELECT list and GROUP BY) yields
988     * one finding. Value is the catalog-canonical resolved table name.
989     */
990    private static final class UnqualifiedColumnCollector extends TParseTreeVisitor {
991        final LinkedHashMap<String, String> findings = new LinkedHashMap<String, String>();
992        private final EDbVendor vendor;
993
994        UnqualifiedColumnCollector(EDbVendor vendor) {
995            this.vendor = vendor;
996        }
997
998        @Override
999        public void preVisit(gudusoft.gsqlparser.nodes.TObjectName node) {
1000            if (node == null) return;
1001            if (node.getDbObjectType() != gudusoft.gsqlparser.EDbObjectType.column) return;
1002            String col = node.getColumnNameOnly();
1003            if (col == null || col.isEmpty() || "*".equals(col)) return;
1004            // Qualified in the SQL? Skip — the rule is specifically about
1005            // unqualified references that the resolver had to disambiguate.
1006            String tableQualifier = node.getTableString();
1007            if (tableQualifier != null && tableQualifier.length() > 0) return;
1008            // Resolver couldn't pin it down → bindingError() already
1009            // surfaced this as SQL_SEMANTIC_ERROR. Defensive guard.
1010            if (node.isAmbiguous()) return;
1011            gudusoft.gsqlparser.nodes.TTable src = node.getSourceTable();
1012            if (src == null || src.getName() == null) return;
1013            // Skip CTE / subquery sources — they have no catalog identity
1014            // and "resolves to RS-1" is not actionable advice. Real
1015            // tables have an empty subquery and CTE pointer; that's the
1016            // signal we need.
1017            if (src.getSubquery() != null || src.getCTE() != null) return;
1018            String resolvedTable = IdentifierService.normalizeStatic(
1019                    vendor, ESQLDataObjectType.dotTable, src.getName().toString());
1020            if (resolvedTable == null || resolvedTable.isEmpty()) return;
1021            String key = node.toString();
1022            findings.putIfAbsent(key, resolvedTable);
1023        }
1024    }
1025
1026    /**
1027     * Blueprint §S25 — emit {@code UNSUPPORTED_DIALECT_FUNCTION} when
1028     * the SQL calls a function name that's blacklisted for the
1029     * request dialect by {@link DialectFunctionDivergence}.
1030     *
1031     * <p>AST-only ({@link TFunctionCall} visitor) rather than going
1032     * through {@code dlineage.relationship.getFunction()} so functions
1033     * outside projection lineage still fire — e.g.
1034     * {@code SELECT 1 FROM t WHERE DATE_TRUNC('day', created_at) > NOW()}
1035     * dies the same as {@code SELECT DATE_TRUNC(...) FROM t}.
1036     *
1037     * <p>Dedup is left to {@link #addViolation}'s existing
1038     * {@code (statementIndex, code, evidence)} guard: the evidence
1039     * carries the function name as written, so the same name twice in
1040     * one statement produces one violation. Functions not in the
1041     * blacklist (UDFs, vendor-specific extensions we haven't gated,
1042     * unknown names) are silently allowed — the blacklist is
1043     * explicit-only by design, see
1044     * {@link DialectFunctionDivergence} Javadoc.
1045     *
1046     * <p>Severity is {@code error → DENY}: blacklist entries are
1047     * cases where the SQL genuinely cannot execute on the target
1048     * dialect, not soft preferences. The remediation hint stored in
1049     * the blacklist is appended to the violation message so the user
1050     * gets actionable advice.
1051     */
1052    private void checkUnsupportedDialectFunction(TSelectSqlStatement sel, int index,
1053                                                  SqlGuardResponse r, EDbVendor vendor) {
1054        UnsupportedDialectFunctionCollector v =
1055                new UnsupportedDialectFunctionCollector(vendor);
1056        sel.acceptChildren(v);
1057        for (Map.Entry<String, String> e : v.findings.entrySet()) {
1058            String fnName = e.getKey();
1059            String hint = e.getValue();
1060            String message = fnName + " is not supported by " + vendor.toString()
1061                    + (hint == null ? "." : ". " + hint);
1062            addViolation(r, index, "UNSUPPORTED_DIALECT_FUNCTION", "error",
1063                    message, fnName);
1064        }
1065    }
1066
1067    /**
1068     * AST visitor for {@link #checkUnsupportedDialectFunction}.
1069     * Collects each {@link TFunctionCall} whose function name is
1070     * blacklisted for the request dialect by
1071     * {@link DialectFunctionDivergence}. {@code findings} is keyed on
1072     * the as-written function name (preserves source casing for
1073     * evidence) → remediation hint, so the same call repeated yields
1074     * one finding.
1075     *
1076     * <p>Skips window / FILTER forms by the same logic as
1077     * {@link AggregateCallCollector}: a window or FILTER form's
1078     * <i>aggregate</i> isn't what's being challenged — the wrapping
1079     * window function is. But for the dialect-function check the
1080     * unsupported call IS the issue, not the wrapping. So we walk all
1081     * TFunctionCalls regardless of window/FILTER context — if
1082     * {@code DATE_TRUNC} appears inside an OVER() partition, it's
1083     * still unsupported on MySQL.
1084     */
1085    private static final class UnsupportedDialectFunctionCollector extends TParseTreeVisitor {
1086        final LinkedHashMap<String, String> findings = new LinkedHashMap<String, String>();
1087        private final EDbVendor vendor;
1088
1089        UnsupportedDialectFunctionCollector(EDbVendor vendor) {
1090            this.vendor = vendor;
1091        }
1092
1093        @Override
1094        public void preVisit(TFunctionCall node) {
1095            if (node == null || node.getFunctionName() == null) {
1096                return;
1097            }
1098            String raw = node.getFunctionName().toString();
1099            if (raw == null || raw.isEmpty()) {
1100                return;
1101            }
1102            // Strip any quoting / brackets the lexer left on the name.
1103            // SqlGuardCatalogSamples.strip handles `"..."`, `[...]`, and
1104            // backtick forms — same helper AggregateCallCollector uses.
1105            String name = SqlGuardCatalogSamples.strip(raw);
1106            if (name == null || name.isEmpty()) {
1107                return;
1108            }
1109            String hint = DialectFunctionDivergence.lookup(vendor, name);
1110            if (hint == null) {
1111                return;
1112            }
1113            // Key on the as-written name (preserves casing in evidence)
1114            // and upper-case for the dedup compare so two appearances
1115            // of `date_trunc` and `DATE_TRUNC` in the same statement
1116            // collapse to one finding.
1117            String key = name.toUpperCase(Locale.ROOT);
1118            if (!findings.containsKey(key)) {
1119                findings.put(key, hint);
1120            }
1121        }
1122    }
1123
1124    /**
1125     * Intent-keyword whitelist for blueprint §S23. Lower-case substring
1126     * match against the natural-language {@code question} text — present
1127     * → caller is asking for a DISTINCT-grain answer. The list is
1128     * intentionally short and English-only; this is a soft WARN rule,
1129     * not a contract enforcer. Add new keywords here when the false-
1130     * negative rate matters; resist the urge to swap in regex / NLP —
1131     * the simplicity is the design.
1132     */
1133    private static final String[] DISTINCT_INTENT_KEYWORDS = new String[] {
1134            "distinct", "unique", "different", "deduplicat",
1135            "how many kinds", "how many types"
1136    };
1137
1138    /**
1139     * Blueprint §S23 — emit {@code AGGREGATION_GRAIN_MISMATCH} when the
1140     * caller's natural-language {@link SqlGuardRequest#question} text
1141     * implies a DISTINCT-grain answer (keyword whitelist match) but the
1142     * SQL's aggregate edge for the answering column uses a non-DISTINCT
1143     * call, OR vice-versa (plain-count intent answered with
1144     * {@code COUNT(DISTINCT)}).
1145     *
1146     * <p>Self-silences when {@code question} is null/blank — the
1147     * question itself is the opt-in. No preset gate (parallels
1148     * {@link #checkTenantFilter}'s {@code userContext.tenantId} opt-in,
1149     * not {@link #checkUnqualifiedColumns}'s preset gate): the rule is
1150     * useful whenever the caller declared intent, regardless of which
1151     * hygiene preset they picked. The intent heuristic is a tiny
1152     * keyword whitelist by design (see
1153     * {@link #DISTINCT_INTENT_KEYWORDS}); NLP is explicitly out of
1154     * scope, false positives surface as soft WARNs only.
1155     *
1156     * <p>Signal B (SQL grain) comes from
1157     * {@link SqlGuardResponse.LineageEdge#aggregateFunctions}, populated
1158     * by S2's AST visitor. An edge whose list contains any
1159     * {@code "FUNC(DISTINCT)"} entry is DISTINCT-grain; otherwise plain.
1160     * One violation per disagreeing aggregate edge — non-aggregate
1161     * edges carry no grain signal and are skipped. Multi-aggregate
1162     * edges (e.g. {@code COUNT(DISTINCT a) / COUNT(*)}) read as
1163     * DISTINCT-grain by the {@code any-DISTINCT} rule, matching S2's
1164     * "list grows naturally" contract.
1165     */
1166    private void checkAggregationGrainMismatch(int index, SqlGuardRequest req, SqlGuardResponse r) {
1167        if (req == null || req.question == null) {
1168            return;
1169        }
1170        String question = req.question.toLowerCase(Locale.ROOT);
1171        if (question.trim().isEmpty()) {
1172            return;
1173        }
1174        boolean intentDistinct = false;
1175        for (String needle : DISTINCT_INTENT_KEYWORDS) {
1176            if (question.contains(needle)) {
1177                intentDistinct = true;
1178                break;
1179            }
1180        }
1181        for (SqlGuardResponse.LineageEdge edge : r.facts.lineage) {
1182            if (edge.statementIndex != index) continue;
1183            if (edge.aggregateFunctions == null || edge.aggregateFunctions.isEmpty()) continue;
1184
1185            boolean sqlDistinct = false;
1186            for (String fn : edge.aggregateFunctions) {
1187                if (fn != null && fn.endsWith("(DISTINCT)")) {
1188                    sqlDistinct = true;
1189                    break;
1190                }
1191            }
1192            if (intentDistinct == sqlDistinct) continue;
1193
1194            String target = edge.targetColumn == null ? "?" : edge.targetColumn;
1195            String fnList = joinList(edge.aggregateFunctions);
1196            String message;
1197            String evidence;
1198            if (intentDistinct) {
1199                message = "Question implies a DISTINCT aggregation but SQL uses non-distinct "
1200                        + fnList + "; result may double-count rows.";
1201                evidence = target + ": question implies DISTINCT, SQL aggregates with " + fnList;
1202            } else {
1203                message = "Question implies a plain aggregation but SQL uses "
1204                        + fnList + "; DISTINCT may silently under-count rows.";
1205                evidence = target + ": question implies plain aggregation, SQL aggregates with " + fnList;
1206            }
1207            addViolation(r, index, "AGGREGATION_GRAIN_MISMATCH", "warn", message, evidence);
1208        }
1209    }
1210
1211    private static String joinList(List<String> items) {
1212        if (items == null || items.isEmpty()) return "";
1213        StringBuilder sb = new StringBuilder();
1214        for (int i = 0; i < items.size(); i++) {
1215            if (i > 0) sb.append(", ");
1216            sb.append(items.get(i));
1217        }
1218        return sb.toString();
1219    }
1220
1221    /**
1222     * Blueprint §S22 — emit {@code MISSING_TENANT_FILTER} when any
1223     * in-scope table carries a {@code TENANT_KEY}-tagged column but no
1224     * WHERE / HAVING predicate references that column.
1225     *
1226     * <p>Gated on {@code userContext.tenantId} being supplied — the
1227     * blueprint pins the rule to "caller declared tenant context", so
1228     * silence-by-default keeps existing requests (and the S1 / S2 seed
1229     * tests, which run preset {@code "all"} on broker SQL without
1230     * tenant filters) byte-identical until the caller opts in. Severity
1231     * is {@code warn} at all preset levels; a future {@code error}
1232     * escalation under a dedicated tenant-aware preset is the natural
1233     * extension (TODO item #2 ramp-up).
1234     *
1235     * <p>In-scope tables come from {@code df.getTables()}, NOT
1236     * {@link TCustomSqlStatement#tables}: CTE-wrapped SQL (the blueprint
1237     * S22 fixture is one) only puts the CTE alias in
1238     * {@code stmt.tables}, while dlineage transparently surfaces the
1239     * inner real tables. The check would silently no-op on the canonical
1240     * S22 fixture without this.
1241     *
1242     * <p>WHERE / HAVING coverage uses dlineage's {@code fdr} edges with
1243     * {@code sourceColumn.getClauseType()} in {@code {"where",
1244     * "having"}}. {@code joinCondition} is intentionally NOT accepted as
1245     * a tenant filter — {@code ON a.tenant_id = b.tenant_id} correlates
1246     * two tenant columns to each other but does not pin them to any
1247     * single tenant, so a join-only "filter" still returns cross-tenant
1248     * rows.
1249     */
1250    private void checkTenantFilter(int index, SqlGuardRequest req, SqlGuardResponse r,
1251                                    UnifiedCatalogModel cat, dataflow df, EDbVendor vendor) {
1252        if (df == null) {
1253            return;
1254        }
1255        if (req == null || req.userContext == null) {
1256            return;
1257        }
1258        Object tenantId = req.userContext.get("tenantId");
1259        if (!(tenantId instanceof String) || ((String) tenantId).isEmpty()) {
1260            return;
1261        }
1262
1263        // Required: each TENANT_KEY-tagged column on every in-scope catalog
1264        // table. Keyed by catalog-canonical table name so the diff against
1265        // `filtered` (also keyed by catalog name) folds correctly.
1266        Map<String, LinkedHashSet<String>> required = new LinkedHashMap<String, LinkedHashSet<String>>();
1267        if (df.getTables() != null) {
1268            for (table t : df.getTables()) {
1269                if (t == null || t.getName() == null) {
1270                    continue;
1271                }
1272                SqlGuardCatalogSamples.TableMatch tm =
1273                        SqlGuardCatalogSamples.findTable(cat, t.getName());
1274                if (tm == null) {
1275                    continue;
1276                }
1277                LinkedHashSet<String> tenantCols = null;
1278                for (ColumnModel col : tm.table.columns()) {
1279                    if (SqlGuardCatalogSamples.policyTagNames(col).contains("TENANT_KEY")) {
1280                        if (tenantCols == null) {
1281                            tenantCols = new LinkedHashSet<String>();
1282                        }
1283                        tenantCols.add(col.name());
1284                    }
1285                }
1286                if (tenantCols != null && !required.containsKey(tm.table.name())) {
1287                    required.put(tm.table.name(), tenantCols);
1288                }
1289            }
1290        }
1291        if (required.isEmpty()) {
1292            return;
1293        }
1294
1295        // Filtered: (table, tenantCol) pairs that appear in a WHERE or
1296        // HAVING predicate per dlineage's fdr edges. Catalog-bound so a
1297        // tenant column referenced under its catalog identifier matches
1298        // regardless of the SQL author's casing.
1299        Map<String, LinkedHashSet<String>> filtered = new LinkedHashMap<String, LinkedHashSet<String>>();
1300        if (df.getRelationships() != null) {
1301            for (relationship rel : df.getRelationships()) {
1302                if (!"fdr".equals(rel.getType()) || rel.getSources() == null) {
1303                    continue;
1304                }
1305                for (sourceColumn src : rel.getSources()) {
1306                    String clause = src.getClauseType();
1307                    if (!"where".equals(clause) && !"having".equals(clause)) {
1308                        continue;
1309                    }
1310                    if (src.getParent_name() == null || src.getColumn() == null) {
1311                        continue;
1312                    }
1313                    SqlGuardCatalogSamples.TableMatch tm =
1314                            SqlGuardCatalogSamples.findTable(cat, src.getParent_name());
1315                    if (tm == null) {
1316                        continue;
1317                    }
1318                    ColumnModel col = SqlGuardCatalogSamples.findColumn(tm, src.getColumn());
1319                    if (col == null) {
1320                        continue;
1321                    }
1322                    LinkedHashSet<String> cols = filtered.get(tm.table.name());
1323                    if (cols == null) {
1324                        cols = new LinkedHashSet<String>();
1325                        filtered.put(tm.table.name(), cols);
1326                    }
1327                    cols.add(col.name());
1328                }
1329            }
1330        }
1331
1332        // Diff → ordered evidence string ("tableA (col1), tableB (col2, col3)").
1333        StringBuilder evidence = new StringBuilder();
1334        for (Map.Entry<String, LinkedHashSet<String>> entry : required.entrySet()) {
1335            LinkedHashSet<String> filteredCols = filtered.get(entry.getKey());
1336            LinkedHashSet<String> missing = new LinkedHashSet<String>();
1337            for (String tenantCol : entry.getValue()) {
1338                if (filteredCols == null || !filteredCols.contains(tenantCol)) {
1339                    missing.add(tenantCol);
1340                }
1341            }
1342            if (missing.isEmpty()) {
1343                continue;
1344            }
1345            if (evidence.length() > 0) {
1346                evidence.append(", ");
1347            }
1348            evidence.append(entry.getKey()).append(" (");
1349            boolean firstCol = true;
1350            for (String col : missing) {
1351                if (!firstCol) {
1352                    evidence.append(", ");
1353                }
1354                firstCol = false;
1355                evidence.append(col);
1356            }
1357            evidence.append(")");
1358        }
1359        if (evidence.length() == 0) {
1360            return;
1361        }
1362        addViolation(r, index, "MISSING_TENANT_FILTER", "warn",
1363                "Tables with TENANT_KEY columns are referenced without a tenant-scoped predicate; query returns cross-tenant rows.",
1364                evidence.toString());
1365    }
1366
1367    /**
1368     * S24 — {@code WRONG_JOIN_PATH}. Walks dlineage's {@code fdr} edges
1369     * whose sources carry {@code clauseType="joinCondition"}, pairs the
1370     * sources by appearance order (one fdr per ON clause; multi-predicate
1371     * ON clauses put all sources into the same {@code sources[]} array,
1372     * left/right interleaved), catalog-binds each side, and compares the
1373     * predicate against the catalog's FK constraints in either direction.
1374     *
1375     * <p>For every successfully bound pair, emits a {@link SqlGuardResponse.JoinFact}
1376     * with {@code joinPath} and {@code joinPathMatchesForeignKey}. When
1377     * an FK exists between the two joined tables but the predicate's
1378     * columns don't line up with it, the JoinFact also gets
1379     * {@code expectedForeignKeyPath} (the correct predicate) and one
1380     * {@code WRONG_JOIN_PATH} violation fires.</p>
1381     *
1382     * <p><b>Two-pass scan per ON clause</b> handles mixed-predicate cases.
1383     * dlineage labels every column that appears anywhere in an ON clause
1384     * with {@code clauseType="joinCondition"} — including the column-side
1385     * of non-equi predicates like {@code A.x &gt; B.y} and {@code A.z
1386     * &gt; B.start_ts}. A naive single-pass would treat
1387     * {@code (A.x, B.y, A.z, B.start_ts)} as two equi-join pairs and
1388     * misfire WRONG_JOIN_PATH on the second pair (the inequality). We
1389     * avoid that by first scanning every pair to see whether ANY pair
1390     * matched an FK; if yes, suppress WRONG_JOIN_PATH for all other
1391     * pairs between the same two tables in this ON clause (they're
1392     * additional filters / multi-column equi-joins / non-equi clauses,
1393     * not wrong-path joins).</p>
1394     *
1395     * <p><b>Silent paths</b> (intentional, not bugs):</p>
1396     * <ul>
1397     *   <li>No FK exists between the joined tables at all → no violation
1398     *       (we can't second-guess novel joins; only flag mismatches
1399     *       against KNOWN constraints).</li>
1400     *   <li>Predicate columns don't bind to the catalog (e.g. CTE or
1401     *       subquery alias) → no violation (we can't verify what we
1402     *       can't see).</li>
1403     *   <li>Pure non-equi joins where dlineage hasn't paired both sides
1404     *       (e.g. {@code UPPER(A.x) = B.y} — the function side comes
1405     *       back with {@code clauseType="unknown"} rather than
1406     *       {@code "joinCondition"} and is silently dropped).</li>
1407     * </ul>
1408     *
1409     * <p><b>Single-column FK only (V1).</b> Composite FKs in the catalog
1410     * are ignored — they need AND-chained predicate correlation within
1411     * a single ON clause, which is shape work deferred to blueprint B8
1412     * (TODO item #15). Until then, an SQL query that correctly joins
1413     * via a composite FK gets {@code joinPathMatchesForeignKey:false}
1414     * but no violation fires (the catalog has no single-column FK between
1415     * the tables, so the rule stays silent).</p>
1416     *
1417     * <p>Severity is {@code warn}: a wrong join path produces meaningful
1418     * but plausible-looking output, so the rule warns rather than DENYs
1419     * — escalation is up to the BFF / human reviewer.</p>
1420     */
1421    private void checkWrongJoinPath(int index, SqlGuardResponse r,
1422                                    UnifiedCatalogModel cat, dataflow df, EDbVendor vendor) {
1423        if (df == null || df.getRelationships() == null) {
1424            return;
1425        }
1426        for (relationship rel : df.getRelationships()) {
1427            if (!"fdr".equals(rel.getType()) || rel.getSources() == null) {
1428                continue;
1429            }
1430            // Collect join-condition sources within this relationship in
1431            // appearance order. dlineage emits one fdr per ON clause;
1432            // multi-predicate ONs put all sources into the same sources[].
1433            List<sourceColumn> joinSources = new ArrayList<sourceColumn>();
1434            for (sourceColumn src : rel.getSources()) {
1435                if ("joinCondition".equals(src.getClauseType())) {
1436                    joinSources.add(src);
1437                }
1438            }
1439            // Two-pass scan. Pass 1: bind every pair to the catalog and
1440            // record whether it matches an FK. Pass 2: emit JoinFact for
1441            // each bound pair; fire WRONG_JOIN_PATH only when no pair
1442            // between the SAME table pair already matched an FK.
1443            //
1444            // Suppression is keyed by (leftTable, rightTable) identity
1445            // (order-independent), not by the whole ON clause. This way
1446            // a correct equi-join on (A, B) doesn't accidentally silence
1447            // a wrong-path join on (C, B) in the same ON clause — rare
1448            // but possible when an ON references a third table via a
1449            // correlated subquery or old-style multi-relation predicate.
1450            List<BoundJoinPair> pairs = new ArrayList<BoundJoinPair>();
1451            Set<TablePair> tablePairsWithFkMatch = new HashSet<TablePair>();
1452            for (int i = 0; i + 1 < joinSources.size(); i += 2) {
1453                BoundJoinPair bp = bindJoinPair(
1454                        joinSources.get(i), joinSources.get(i + 1), cat, vendor);
1455                if (bp == null) {
1456                    continue;  // Unbound side; can't reason about it.
1457                }
1458                bp.matchedFk = matchFkInEitherDirection(bp, cat, vendor);
1459                if (bp.matchedFk != null) {
1460                    tablePairsWithFkMatch.add(new TablePair(bp.leftTm.table, bp.rightTm.table));
1461                }
1462                pairs.add(bp);
1463            }
1464
1465            for (BoundJoinPair bp : pairs) {
1466                SqlGuardResponse.JoinFact fact = new SqlGuardResponse.JoinFact();
1467                fact.joinPath = bp.joinPath;
1468                fact.joinPathMatchesForeignKey = bp.matchedFk != null;
1469
1470                boolean samePairAlreadyMatched = tablePairsWithFkMatch.contains(
1471                        new TablePair(bp.leftTm.table, bp.rightTm.table));
1472                if (bp.matchedFk == null && !samePairAlreadyMatched) {
1473                    // No FK match anywhere in this ON clause — check whether
1474                    // an FK between these tables exists and surface a
1475                    // Did-you-mean.
1476                    ConstraintModel candidate = anySingleColumnFkBetween(
1477                            bp.leftTm.table, bp.rightTm.table, cat, vendor);
1478                    if (candidate == null) {
1479                        candidate = anySingleColumnFkBetween(
1480                                bp.rightTm.table, bp.leftTm.table, cat, vendor);
1481                    }
1482                    if (candidate != null) {
1483                        String expected = expectedFkPath(
1484                                candidate, bp.leftTm.table, bp.rightTm.table);
1485                        fact.expectedForeignKeyPath = expected;
1486                        addViolation(r, index, "WRONG_JOIN_PATH", "warn",
1487                                "Join condition does not match the known foreign key "
1488                                        + expected + ". Likely incorrect join.",
1489                                bp.joinPath);
1490                    }
1491                }
1492                r.facts.joins.add(fact);
1493            }
1494        }
1495    }
1496
1497    /**
1498     * Catalog-bound representation of one ON-predicate pair. Used for the
1499     * two-pass scan in {@link #checkWrongJoinPath} — pass 1 builds the
1500     * list with {@code matchedFk} populated, pass 2 decides JoinFact /
1501     * violation emission with full knowledge of every pair in the ON.
1502     */
1503    private static final class BoundJoinPair {
1504        final SqlGuardCatalogSamples.TableMatch leftTm;
1505        final ColumnModel leftCol;
1506        final SqlGuardCatalogSamples.TableMatch rightTm;
1507        final ColumnModel rightCol;
1508        final String joinPath;
1509        ConstraintModel matchedFk;
1510        BoundJoinPair(SqlGuardCatalogSamples.TableMatch leftTm, ColumnModel leftCol,
1511                      SqlGuardCatalogSamples.TableMatch rightTm, ColumnModel rightCol) {
1512            this.leftTm = leftTm;
1513            this.leftCol = leftCol;
1514            this.rightTm = rightTm;
1515            this.rightCol = rightCol;
1516            this.joinPath = leftTm.table.name() + "." + leftCol.name()
1517                    + " = " + rightTm.table.name() + "." + rightCol.name();
1518        }
1519    }
1520
1521    private BoundJoinPair bindJoinPair(sourceColumn left, sourceColumn right,
1522                                       UnifiedCatalogModel cat, EDbVendor vendor) {
1523        SqlGuardCatalogSamples.TableMatch leftTm =
1524                SqlGuardCatalogSamples.findTable(cat, left.getParent_name());
1525        SqlGuardCatalogSamples.TableMatch rightTm =
1526                SqlGuardCatalogSamples.findTable(cat, right.getParent_name());
1527        if (leftTm == null || rightTm == null) return null;
1528        ColumnModel leftCol = SqlGuardCatalogSamples.findColumn(leftTm, left.getColumn());
1529        ColumnModel rightCol = SqlGuardCatalogSamples.findColumn(rightTm, right.getColumn());
1530        if (leftCol == null || rightCol == null) return null;
1531        return new BoundJoinPair(leftTm, leftCol, rightTm, rightCol);
1532    }
1533
1534    private ConstraintModel matchFkInEitherDirection(BoundJoinPair bp,
1535                                                     UnifiedCatalogModel cat,
1536                                                     EDbVendor vendor) {
1537        ConstraintModel m = matchSingleColumnFk(
1538                bp.leftTm.table, bp.leftCol.name(),
1539                bp.rightTm, bp.rightCol.name(), cat, vendor);
1540        if (m != null) return m;
1541        return matchSingleColumnFk(
1542                bp.rightTm.table, bp.rightCol.name(),
1543                bp.leftTm, bp.leftCol.name(), cat, vendor);
1544    }
1545
1546    /**
1547     * Does {@code fkOwner} have a single-column FK from {@code fkColumn}
1548     * to {@code referencedTm.column}? Returns the matching constraint or
1549     * null. The referenced side is matched by resolving the FK's
1550     * {@code referencedTable} string through {@link SqlGuardCatalogSamples#findTable}
1551     * and comparing {@link TableModel} object identity — same instance,
1552     * same table. This is the multi-schema-safe comparison for
1553     * schema-qualified or catalog-qualified FK targets ({@code "hr.dept"},
1554     * {@code "ORCL.HR.DEPT"}).
1555     *
1556     * <p><b>Known limitation</b> — bare FK {@code referencedTable} values
1557     * (e.g. {@code "dept"} with no schema prefix) resolve via the
1558     * catalog-wide "first table wins" search in
1559     * {@link SqlGuardCatalogSamples#findTable}, while
1560     * {@link gudusoft.gsqlparser.catalog.input.CatalogModelValidator}
1561     * resolves bare FK targets current-schema-first. Catalog fixtures
1562     * shipped today are single-schema so this divergence is latent;
1563     * when a multi-schema sample catalog lands, the right fix is to
1564     * thread the FK owner's schema through to the resolver. Until then
1565     * the validator's dangling-reference check is the safety net —
1566     * an FK whose {@code referencedTable} doesn't resolve in either
1567     * resolver gets flagged at catalog-load time.</p>
1568     */
1569    private ConstraintModel matchSingleColumnFk(TableModel fkOwner,
1570                                                String fkColumn,
1571                                                SqlGuardCatalogSamples.TableMatch referencedTm,
1572                                                String referencedColumn,
1573                                                UnifiedCatalogModel cat,
1574                                                EDbVendor vendor) {
1575        if (fkOwner == null || referencedTm == null) return null;
1576        for (ConstraintModel cs : fkOwner.constraints()) {
1577            if (!isForeignKey(cs.type())) continue;
1578            if (cs.columns().size() != 1 || cs.referencedColumns().size() != 1) continue;
1579            if (!IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotColumn,
1580                    cs.columns().get(0), fkColumn)) continue;
1581            SqlGuardCatalogSamples.TableMatch fkTargetTm =
1582                    SqlGuardCatalogSamples.findTable(cat, cs.referencedTable());
1583            if (fkTargetTm == null || fkTargetTm.table != referencedTm.table) continue;
1584            if (!IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotColumn,
1585                    cs.referencedColumns().get(0), referencedColumn)) continue;
1586            return cs;
1587        }
1588        return null;
1589    }
1590
1591    /**
1592     * Any single-column FK on {@code fkOwner} that targets {@code referencedTable}
1593     * (regardless of which exact columns). Used to drive the "Did-you-mean"
1594     * expectedForeignKeyPath when the predicate joined the right tables
1595     * via the wrong columns. Uses TableModel identity for target matching
1596     * (same multi-schema safety as {@link #matchSingleColumnFk}).
1597     */
1598    private ConstraintModel anySingleColumnFkBetween(TableModel fkOwner,
1599                                                     TableModel referencedTable,
1600                                                     UnifiedCatalogModel cat,
1601                                                     EDbVendor vendor) {
1602        if (fkOwner == null || referencedTable == null) return null;
1603        for (ConstraintModel cs : fkOwner.constraints()) {
1604            if (!isForeignKey(cs.type())) continue;
1605            if (cs.columns().size() != 1 || cs.referencedColumns().size() != 1) continue;
1606            SqlGuardCatalogSamples.TableMatch fkTargetTm =
1607                    SqlGuardCatalogSamples.findTable(cat, cs.referencedTable());
1608            if (fkTargetTm != null && fkTargetTm.table == referencedTable) {
1609                return cs;
1610            }
1611        }
1612        return null;
1613    }
1614
1615    /**
1616     * Canonical "expected" path string for a {@link SqlGuardResponse.JoinFact},
1617     * ordered to match the SQL author's join orientation so the BFF Did-you-mean
1618     * reads naturally ({@code leftTable.fkCol = rightTable.refCol} when the
1619     * FK lives on the left, vice versa otherwise).
1620     */
1621    private String expectedFkPath(ConstraintModel fk, TableModel leftTable, TableModel rightTable) {
1622        // FK owner is either leftTable or rightTable (anySingleColumnFkBetween
1623        // only returns FKs whose owner is one of the two joined tables).
1624        boolean fkOnLeft = fkLivesOnTable(fk, leftTable);
1625        TableModel owner = fkOnLeft ? leftTable : rightTable;
1626        TableModel target = fkOnLeft ? rightTable : leftTable;
1627        String ownerCol = fk.columns().get(0);
1628        String targetCol = fk.referencedColumns().get(0);
1629        return owner.name() + "." + ownerCol + " = " + target.name() + "." + targetCol;
1630    }
1631
1632    /**
1633     * Order-independent identity key for a (TableModel, TableModel) pair.
1634     * Used by {@link #checkWrongJoinPath}'s suppression set: a pair seen
1635     * as {@code (A, B)} must collide with the same pair seen as
1636     * {@code (B, A)}.
1637     *
1638     * <p>Implementation note: avoids canonicalizing pair order at
1639     * construction. A previous version sorted by {@link System#identityHashCode}
1640     * but that breaks under identity-hash collisions — two distinct objects
1641     * sharing a hash would canonicalize inconsistently between
1642     * {@code (a,b)} and {@code (b,a)}, so {@code equals} would falsely
1643     * report inequality. Instead, {@code equals} compares both orderings
1644     * with {@code ==} (collision-proof — only true object identity
1645     * matches), and {@code hashCode} XORs the two identity hashes for
1646     * an order-independent bucket value.</p>
1647     */
1648    private static final class TablePair {
1649        private final TableModel a;
1650        private final TableModel b;
1651        TablePair(TableModel a, TableModel b) {
1652            this.a = a;
1653            this.b = b;
1654        }
1655        @Override public boolean equals(Object o) {
1656            if (this == o) return true;
1657            if (!(o instanceof TablePair)) return false;
1658            TablePair other = (TablePair) o;
1659            return (a == other.a && b == other.b)
1660                    || (a == other.b && b == other.a);
1661        }
1662        @Override public int hashCode() {
1663            return System.identityHashCode(a) ^ System.identityHashCode(b);
1664        }
1665    }
1666
1667    private static boolean fkLivesOnTable(ConstraintModel fk, TableModel t) {
1668        if (t == null) return false;
1669        // Reference equality is safe because the FK was obtained from one of
1670        // the joined tables' own constraints() lists in checkWrongJoinPath
1671        // — same TableModel instance, same constraint instance. The
1672        // alternative (value-equality on ConstraintModel) would also work
1673        // but matters only if two distinct ConstraintModel instances
1674        // happen to be equals() — which doesn't happen in our call paths.
1675        for (ConstraintModel cs : t.constraints()) {
1676            if (cs == fk) return true;
1677        }
1678        return false;
1679    }
1680
1681    private static boolean isForeignKey(String type) {
1682        if (type == null) return false;
1683        // ASCII case-insensitive match for FK / FOREIGN KEY / FOREIGN_KEY.
1684        // Hand-coded to mirror CatalogModelValidator's helper (forbidden-apis
1685        // bans equalsIgnoreCase inside catalog/**; sqlguard reuses the same
1686        // discipline for parity).
1687        return asciiEqualsIgnoreCase(type, "FK")
1688                || asciiEqualsIgnoreCase(type, "FOREIGN KEY")
1689                || asciiEqualsIgnoreCase(type, "FOREIGN_KEY");
1690    }
1691
1692    private static boolean asciiEqualsIgnoreCase(String a, String b) {
1693        if (a == null || b == null) return a == b;
1694        int len = a.length();
1695        if (b.length() != len) return false;
1696        for (int i = 0; i < len; i++) {
1697            char ca = a.charAt(i);
1698            char cb = b.charAt(i);
1699            if (ca == cb) continue;
1700            if (ca >= 'A' && ca <= 'Z') ca = (char) (ca + 32);
1701            if (cb >= 'A' && cb <= 'Z') cb = (char) (cb + 32);
1702            if (ca != cb) return false;
1703        }
1704        return true;
1705    }
1706
1707    /**
1708     * Pick the top-level resultset that holds the SELECT's projected
1709     * columns. dlineage names it {@code "RS-1"} (one resultset per
1710     * statement when we run DataFlowAnalyzer per-SELECT). Intermediate
1711     * resultsets carry function names ({@code "COUNT"}, {@code "AVG"},
1712     * {@code "CAST"}) and are NOT the answer here.
1713     */
1714    private table topResultset(dataflow df) {
1715        if (df == null || df.getResultsets() == null) {
1716            return null;
1717        }
1718        for (table rs : df.getResultsets()) {
1719            if (rs.getName() != null && rs.getName().startsWith("RS-")) {
1720                return rs;
1721            }
1722        }
1723        return df.getResultsets().isEmpty() ? null : df.getResultsets().get(0);
1724    }
1725
1726    /**
1727     * dlineage attaches a synthetic {@code RelationRows} column to every
1728     * resultset to carry filter / join / where-clause column references.
1729     * It is NOT part of the SELECT projection and must be skipped before
1730     * position-matching against the AST's result columns.
1731     */
1732    private boolean isSystemColumn(column c) {
1733        return c != null && ("RelationRows".equals(c.getName()) || "system".equals(c.getSource()));
1734    }
1735
1736    /**
1737     * Walk the {@code fdd} (data-dependency) relationships from a
1738     * resultset column down to its leaf source columns, recursing
1739     * through intermediate function / CAST resultsets. Sets
1740     * {@code traversedResultset[0]=true} if any hop went through an
1741     * intermediate, which the caller uses to distinguish {@code direct}
1742     * from {@code expression} lineage type.
1743     *
1744     * <p>{@code fdr} (filter-dependency) and join-condition
1745     * relationships are not followed — those represent the influence of
1746     * filter / join predicates on row presence, not the data lineage
1747     * of the projected column value.
1748     */
1749    private void traceLeaves(dataflow df, Set<String> resultsetIds,
1750                             String parentId, String colName,
1751                             Set<String> visited, List<sourceColumn> out,
1752                             boolean[] traversedResultset, EDbVendor vendor) {
1753        if (parentId == null || colName == null) {
1754            return;
1755        }
1756        String key = parentId + ":" + colName;
1757        if (!visited.add(key) || visited.size() > 128) {
1758            return;
1759        }
1760        if (df.getRelationships() == null) {
1761            return;
1762        }
1763        for (relationship rel : df.getRelationships()) {
1764            if (!"fdd".equals(rel.getType())) continue;
1765            if (rel.getTarget() == null) continue;
1766            if (!parentId.equals(rel.getTarget().getParent_id())) continue;
1767            if (!nameEquals(vendor, colName, rel.getTarget().getColumn())) continue;
1768            if (rel.getSources() == null) continue;
1769            for (sourceColumn src : rel.getSources()) {
1770                String srcPid = src.getParent_id();
1771                if (srcPid != null && resultsetIds.contains(srcPid)) {
1772                    // Intermediate resultset (function / CAST / subquery).
1773                    traversedResultset[0] = true;
1774                    traceLeaves(df, resultsetIds, srcPid, src.getColumn(),
1775                            visited, out, traversedResultset, vendor);
1776                } else {
1777                    out.add(src);
1778                }
1779            }
1780        }
1781    }
1782
1783    /**
1784     * Collect a result column's PARTITION BY / ORDER BY source columns
1785     * from window aggregates in its expression tree, into two SEPARATE
1786     * output lists so callers can label exposure correctly.
1787     *
1788     * <p>dlineage exposes window-spec columns as {@code fdr} edges with
1789     * {@code clauseType="selectList"} (partition) and {@code "orderby"}
1790     * (order) on the windowed function's intermediate resultset. The
1791     * regular {@link #traceLeaves} skips them because they are not
1792     * {@code fdd} data-dependency edges. Blueprint §B1 / §B2 require
1793     * those columns in {@code sourceColumns} because they determine
1794     * which rows the window aggregate reads — promoting them here keeps
1795     * the wire shape consistent with the seed's expected output.
1796     *
1797     * <p><b>Why two output lists.</b> A PARTITION BY column and an
1798     * ORDER BY column play different governance roles. PII in
1799     * {@code PARTITION BY x} is a k-anonymity leak (small partitions
1800     * identify individuals); PII in {@code ORDER BY x} is a sort-pattern
1801     * leak (rank reveals attribute order). Sqlguard fires distinct
1802     * violations and assigns distinct {@code exposure} labels in
1803     * {@code facts.columns[]} for each role, which requires keeping the
1804     * two source sets distinguishable. Folding both into one list (as
1805     * the original B1/B2 implementation did) collapses the signal and
1806     * led to PII in window partition being mis-attributed as
1807     * {@code explicit_select} exposure with a misleading
1808     * {@code SENSITIVE_COLUMN_IN_OUTPUT} violation.
1809     *
1810     * <p>Strategy: walk {@code fdd} edges from the result column to find
1811     * every intermediate (function / CAST / subquery) resultset that
1812     * feeds it. The window-extra {@code fdr selectList} / {@code orderby}
1813     * edges live ONLY on intermediate resultsets (empirically verified
1814     * across dlineage's current behavior — top-level resultsets carry
1815     * neither clause type for outer ORDER BY / GROUP BY). Collecting on
1816     * intermediates only avoids over-promoting unrelated outer-clause
1817     * edges from the top resultset.
1818     *
1819     * <p>Window-extra edges are scoped per function-resultset
1820     * (dlineage emits separate intermediate resultsets per
1821     * {@code TFunctionCall} with a window spec), so two window calls in
1822     * one projection don't cross-contaminate — each result-column trace
1823     * only descends into its own function resultsets. Pinned by
1824     * {@code separatelyProjectedWindowEdgesDoNotCrossContaminate}.
1825     *
1826     * <p>Recursion is the same bounded depth ({@code visited.size() &gt;
1827     * 128}) as {@link #traceLeaves}; identical visited keying so a
1828     * single statement can't blow up if dlineage produces a cyclic
1829     * graph.
1830     */
1831    private void traceWindowExtras(dataflow df, Set<String> resultsetIds,
1832                                    String parentId, String colName,
1833                                    Set<String> visited,
1834                                    List<sourceColumn> partitionOut,
1835                                    List<sourceColumn> orderOut,
1836                                    EDbVendor vendor) {
1837        if (parentId == null || colName == null) {
1838            return;
1839        }
1840        String key = parentId + ":" + colName;
1841        if (!visited.add(key) || visited.size() > 128) {
1842            return;
1843        }
1844        if (df.getRelationships() == null) {
1845            return;
1846        }
1847        // Walk down through fdd edges. Each intermediate resultset
1848        // encountered is the source of its own window-extra fdr edges
1849        // (if it's a function resultset for a windowed call); leaf
1850        // sources (real tables) are ignored here — the regular
1851        // traceLeaves pass already collected them via fdd.
1852        for (relationship rel : df.getRelationships()) {
1853            if (!"fdd".equals(rel.getType())) continue;
1854            if (rel.getTarget() == null) continue;
1855            if (!parentId.equals(rel.getTarget().getParent_id())) continue;
1856            if (!nameEquals(vendor, colName, rel.getTarget().getColumn())) continue;
1857            if (rel.getSources() == null) continue;
1858            for (sourceColumn src : rel.getSources()) {
1859                String srcPid = src.getParent_id();
1860                String srcCol = src.getColumn();
1861                if (srcPid == null) continue;
1862                if (resultsetIds.contains(srcPid)) {
1863                    collectWindowExtras(df, srcPid, srcCol, partitionOut, orderOut, vendor);
1864                    traceWindowExtras(df, resultsetIds, srcPid, srcCol,
1865                            visited, partitionOut, orderOut, vendor);
1866                }
1867            }
1868        }
1869    }
1870
1871    /**
1872     * Append {@code fdr} edges on ({@code parentId}, {@code colName})
1873     * to the right output list per {@code clauseType}:
1874     * {@code "selectList"} → {@code partitionOut} (PARTITION BY),
1875     * {@code "orderby"} → {@code orderOut} (ORDER BY). Helper for
1876     * {@link #traceWindowExtras}.
1877     *
1878     * <p><b>Scoping</b> is intentionally both {@code parent_id}-keyed
1879     * AND target-column-keyed. The {@code parent_id} alone is not
1880     * sufficient on multi-column intermediates (subquery resultsets
1881     * carry one column per inner projection): if a sibling column on
1882     * the same intermediate happens to bear an unrelated
1883     * {@code selectList} / {@code orderby} fdr edge (a future dlineage
1884     * shape we can't rule out today), the per-column scope keeps it
1885     * out of the current trace's extras.
1886     *
1887     * <p>Empirically (current dlineage), only function-call resultsets
1888     * ever emit these clause types and they're scoped to the single
1889     * function column — so {@code parent_id} alone would also be
1890     * correct today. The tighter gate is defensive against the gap
1891     * dlineage may close inconsistently (MantisBT #4467) and pinned
1892     * by {@code separatelyProjectedWindowEdgesDoNotCrossContaminate}.
1893     */
1894    private void collectWindowExtras(dataflow df, String parentId, String colName,
1895                                      List<sourceColumn> partitionOut,
1896                                      List<sourceColumn> orderOut,
1897                                      EDbVendor vendor) {
1898        if (df.getRelationships() == null) return;
1899        for (relationship rel : df.getRelationships()) {
1900            if (!"fdr".equals(rel.getType())) continue;
1901            if (rel.getTarget() == null) continue;
1902            if (!parentId.equals(rel.getTarget().getParent_id())) continue;
1903            if (colName != null
1904                    && !nameEquals(vendor, colName, rel.getTarget().getColumn())) {
1905                continue;
1906            }
1907            if (rel.getSources() == null) continue;
1908            for (sourceColumn src : rel.getSources()) {
1909                String clause = src.getClauseType();
1910                if ("selectList".equals(clause)) {
1911                    partitionOut.add(src);
1912                } else if ("orderby".equals(clause)) {
1913                    orderOut.add(src);
1914                }
1915            }
1916        }
1917    }
1918
1919    /**
1920     * Identifier-aware equality on column-class identifiers from
1921     * dlineage's relationship model. Dlineage preserves the
1922     * user-written casing on `parent_name` / `column` fields; this
1923     * comparison routes through {@link IdentifierService} so the
1924     * answer is correct under any dialect's case-folding rules.
1925     */
1926    private static boolean nameEquals(EDbVendor vendor, String a, String b) {
1927        if (a == null) return b == null;
1928        if (b == null) return false;
1929        return IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotColumn, a, b);
1930    }
1931
1932    /**
1933     * Run {@link DataFlowAnalyzer} on a single SELECT statement and
1934     * return the in-memory {@code dataflow} model. Emits diagnostics
1935     * to the response for downstream observability — the wire contract
1936     * preserves the {@code DLINEAGE_*} markers that pre-dated this
1937     * refactor so existing consumers continue to see them.
1938     */
1939    private dataflow runDlineage(TSelectSqlStatement sel, EDbVendor vendor,
1940                                  SqlGuardResponse r, int index) {
1941        try {
1942            Option option = new Option();
1943            option.setVendor(vendor);
1944            option.setSimpleOutput(false);
1945            option.setOutput(false);
1946            DataFlowAnalyzer da = new DataFlowAnalyzer(sel.toScript(), option);
1947            da.generateDataFlow();
1948            dataflow df = da.getDataFlow();
1949            if (df != null && df.getRelationships() != null && !df.getRelationships().isEmpty()) {
1950                r.facts.diagnostics.add("DLINEAGE_RELATIONSHIPS_PRESENT:statement=" + index);
1951            } else {
1952                r.facts.diagnostics.add("DLINEAGE_NO_RELATIONSHIP:statement=" + index);
1953            }
1954            return df;
1955        } catch (Exception e) {
1956            r.facts.diagnostics.add("DLINEAGE_UNAVAILABLE:statement=" + index);
1957            return null;
1958        }
1959    }
1960
1961    private void expandStar(int index, SqlGuardResponse r, UnifiedCatalogModel cat,
1962                             String tableName, EDbVendor vendor) {
1963        SqlGuardCatalogSamples.TableMatch tm =
1964                SqlGuardCatalogSamples.findTable(cat, tableName);
1965        if (tm == null) {
1966            return;
1967        }
1968        for (ColumnModel c : tm.table.columns()) {
1969            List<String> tagNames = SqlGuardCatalogSamples.policyTagNames(c);
1970            addColumnFact(r, tm.table.name(), c.name(), tagNames, "implicit_select_star", vendor);
1971            addLineage(r, index, "result." + c.name(),
1972                    Collections.singletonList(tm.schema + "." + tm.table.name() + "." + c.name()), "direct");
1973        }
1974        addViolation(r, index, "SELECT_STAR_ON_SENSITIVE_TABLE", "warn",
1975                "SELECT * may expose sensitive columns.", "sensitive columns are present in selected table");
1976    }
1977
1978    /**
1979     * Collect the catalog-bound canonical forms
1980     * ({@code schema.table.column}) of column references that appear in
1981     * the result-column expression OUTSIDE any window
1982     * {@code OVER (...)} spec. These are the columns the SQL genuinely
1983     * projects — function arguments, scalar operator operands
1984     * ({@code email || '...'}), CASE branches, COALESCE wrappers, etc.
1985     * Anything inside a {@link TWindowDef} / {@link TWindowSpecification}
1986     * (PARTITION BY / ORDER BY / frame) is skipped because those columns
1987     * are the partition/order role's job to surface, not the
1988     * projection's.
1989     *
1990     * <p>The role-precedence pass in {@code analyzeSelect} uses this set
1991     * to distinguish "column is really a projection dependency"
1992     * (legitimate dual-role like {@code email || COUNT(*) OVER
1993     * (PARTITION BY email)} where the bare {@code email} ref is a real
1994     * projection source) from "dlineage over-attributed the column to
1995     * an fdd edge" (COUNT(*) shape where the partition column leaks
1996     * into the function's fdd edge but is not a real projection
1997     * source).
1998     *
1999     * <p>Bound via the same {@link SqlGuardCatalogSamples} helpers as
2000     * the dlineage-source binding pass, so the comparison in
2001     * {@link #stripOverAttributedSources} is a clean Set difference on
2002     * canonical strings — qualifier-aware and dialect-aware. Codex
2003     * round-1 review (B5 work, 2026-05-20) flagged the previous
2004     * last-segment comparison as fragile under joins where two tables
2005     * share a column name; canonical-form comparison closes that gap.
2006     */
2007    private Set<String> collectAstProjectionDependencies(TResultColumn rc,
2008                                                          UnifiedCatalogModel cat,
2009                                                          EDbVendor vendor) {
2010        if (rc == null || rc.getExpr() == null) {
2011            return Collections.emptySet();
2012        }
2013        ProjectionDependencyCollector v = new ProjectionDependencyCollector(cat, vendor);
2014        rc.getExpr().acceptChildren(v);
2015        return v.canonicalRefs;
2016    }
2017
2018    /**
2019     * AST visitor that gathers catalog-canonical references to columns
2020     * that appear in the result expression OUTSIDE any window spec.
2021     * Tracks an {@code insideWindow} depth counter so PARTITION BY /
2022     * ORDER BY / frame subtrees are properly skipped — even when nested
2023     * inside other expressions (e.g. {@code CASE WHEN COUNT(*) OVER
2024     * (PARTITION BY ...) > 1 THEN ...}).
2025     *
2026     * <p>Uses {@link TObjectName#getSourceTable()} (resolver-supplied)
2027     * + {@link TObjectName#getColumnNameOnly()} to build the canonical
2028     * form via {@link SqlGuardCatalogSamples#findTable} /
2029     * {@link SqlGuardCatalogSamples#findColumn} — same path the dlineage-
2030     * source catalog binding uses. References that fail to resolve are
2031     * silently dropped (the rule still fires correctly for resolved
2032     * columns; unresolved refs aren't catalog-bound anywhere in
2033     * sqlguard).
2034     */
2035    private static final class ProjectionDependencyCollector extends TParseTreeVisitor {
2036        final Set<String> canonicalRefs = new LinkedHashSet<String>();
2037        private int insideWindow = 0;
2038        private final UnifiedCatalogModel cat;
2039        private final EDbVendor vendor;
2040
2041        ProjectionDependencyCollector(UnifiedCatalogModel cat, EDbVendor vendor) {
2042            this.cat = cat;
2043            this.vendor = vendor;
2044        }
2045
2046        @Override
2047        public void preVisit(gudusoft.gsqlparser.nodes.TWindowDef node) { insideWindow++; }
2048
2049        @Override
2050        public void postVisit(gudusoft.gsqlparser.nodes.TWindowDef node) { insideWindow--; }
2051
2052        @Override
2053        public void preVisit(gudusoft.gsqlparser.nodes.TWindowSpecification node) { insideWindow++; }
2054
2055        @Override
2056        public void postVisit(gudusoft.gsqlparser.nodes.TWindowSpecification node) { insideWindow--; }
2057
2058        @Override
2059        public void preVisit(TObjectName node) {
2060            if (insideWindow > 0 || node == null) return;
2061            // Skip {@code *} and unresolved refs — they can't catalog-bind.
2062            String col = node.getColumnNameOnly();
2063            if (col == null || col.isEmpty() || "*".equals(col)) return;
2064            gudusoft.gsqlparser.nodes.TTable tbl = node.getSourceTable();
2065            if (tbl == null || tbl.getTableName() == null) return;
2066            String tableRef = tbl.getTableName().toString();
2067            String tableKey = IdentifierService.normalizeStatic(
2068                    vendor, ESQLDataObjectType.dotTable, tableRef);
2069            SqlGuardCatalogSamples.TableMatch tm =
2070                    SqlGuardCatalogSamples.findTable(cat, tableKey);
2071            ColumnModel c = SqlGuardCatalogSamples.findColumn(tm, col);
2072            if (tm != null && c != null) {
2073                canonicalRefs.add(tm.schema + "." + tm.table.name() + "." + c.name());
2074            }
2075        }
2076    }
2077
2078    /**
2079     * Strip from {@code functionArgSources} any column that ALSO
2080     * appears in {@code partitionSources} / {@code orderSources} AND
2081     * is NOT a real AST projection dependency (per
2082     * {@code astProjectionDeps}). The remaining columns in
2083     * {@code functionArgSources} are exactly those genuinely referenced
2084     * as function arguments or scalar operands in the projection —
2085     * passthrough patterns like {@code LAG(email)} or {@code email ||
2086     * COUNT(*) OVER (PARTITION BY email)} preserve their "appears in
2087     * output" role even when the same column is also a partition key.
2088     */
2089    private void stripOverAttributedSources(LinkedHashSet<String> functionArgSources,
2090                                             LinkedHashSet<String> partitionSources,
2091                                             LinkedHashSet<String> orderSources,
2092                                             Set<String> astProjectionDeps) {
2093        Iterator<String> it = functionArgSources.iterator();
2094        while (it.hasNext()) {
2095            String fq = it.next();
2096            boolean overlapsPartition = partitionSources.contains(fq);
2097            boolean overlapsOrder = orderSources.contains(fq);
2098            if (!overlapsPartition && !overlapsOrder) continue;
2099            if (!astProjectionDeps.contains(fq)) {
2100                // dlineage over-attributed this column to fdd; partition/order is
2101                // the only real role.
2102                it.remove();
2103            }
2104        }
2105    }
2106
2107    /**
2108     * Catalog-bind a list of dlineage {@link sourceColumn}s into the
2109     * canonical {@code "schema.table.column"} wire form, dropping any
2110     * leaf that doesn't resolve to a known catalog column. Preserves
2111     * insertion order and dedups via {@link LinkedHashSet}.
2112     *
2113     * <p>Identical resolution logic to the original inline pass in
2114     * {@code analyzeSelect} — factored out so the per-role buckets
2115     * (function-arg leaves, partition extras, order extras) can each
2116     * run it without code duplication.
2117     */
2118    private LinkedHashSet<String> bindToCatalog(List<sourceColumn> leaves,
2119                                                 UnifiedCatalogModel cat,
2120                                                 EDbVendor vendor) {
2121        LinkedHashSet<String> out = new LinkedHashSet<String>();
2122        for (sourceColumn leaf : leaves) {
2123            String tableKey = leaf.getParent_name() == null
2124                    ? null : IdentifierService.normalizeStatic(
2125                            vendor, ESQLDataObjectType.dotTable, leaf.getParent_name());
2126            String colName = leaf.getColumn();
2127            if (tableKey == null || colName == null) continue;
2128            SqlGuardCatalogSamples.TableMatch tm =
2129                    SqlGuardCatalogSamples.findTable(cat, tableKey);
2130            ColumnModel c = SqlGuardCatalogSamples.findColumn(tm, colName);
2131            if (tm != null && c != null) {
2132                out.add(tm.schema + "." + tm.table.name() + "." + c.name());
2133            }
2134        }
2135        return out;
2136    }
2137
2138    private void addColumnFactForSource(SqlGuardResponse r, String fq, String exposure,
2139                                         UnifiedCatalogModel cat, EDbVendor vendor) {
2140        String[] p = fq.split("\\.");
2141        if (p.length < 3) {
2142            return;
2143        }
2144        SqlGuardCatalogSamples.TableMatch tm =
2145                SqlGuardCatalogSamples.findTable(cat, p[1]);
2146        ColumnModel c = SqlGuardCatalogSamples.findColumn(tm, p[2]);
2147        addColumnFact(r, p[1], p[2], SqlGuardCatalogSamples.policyTagNames(c), exposure, vendor);
2148    }
2149
2150    private void addColumnFact(SqlGuardResponse r, String table, String name, List<String> tags,
2151                                String exposure, EDbVendor vendor) {
2152        for (SqlGuardResponse.ColumnFact cf : r.facts.columns) {
2153            // Identifier-aware dedup: a fact for (sbcustomer, sbcustcountry,
2154            // explicit_select) is the same regardless of whether the inputs
2155            // were folded by dialect-specific rules. Exposure is a sqlguard
2156            // tag (`explicit_select` / `implicit_select_star` /
2157            // `window_partition` / `window_order`), not an SQL identifier,
2158            // so plain .equals(). A column that appears in BOTH a projection
2159            // AND a window PARTITION BY produces two separate facts (one
2160            // per role) so the dual-exposure reality is visible to the BFF.
2161            if (IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotTable, cf.table, table)
2162                    && IdentifierService.areEqualStatic(vendor, ESQLDataObjectType.dotColumn, cf.name, name)
2163                    && cf.exposure.equals(exposure)) {
2164                mergeTags(cf.policyTags, tags);
2165                return;
2166            }
2167        }
2168        SqlGuardResponse.ColumnFact cf = new SqlGuardResponse.ColumnFact();
2169        cf.table = table;
2170        cf.name = name;
2171        cf.policyTags.addAll(tags);
2172        cf.exposure = exposure;
2173        r.facts.columns.add(cf);
2174        if (isSensitive(tags)) {
2175            emitSensitiveViolation(r, exposure, table, name);
2176        }
2177    }
2178
2179    /**
2180     * Dispatch the right "sensitive column" violation code based on how
2181     * the column is exposed in the SQL. Each role gets its own rule so
2182     * the BFF / audit log can distinguish "PII in the output" from
2183     * "PII in a window partition" (k-anonymity leak) from "PII in a
2184     * window ORDER BY" (sort-pattern leak).
2185     *
2186     * <ul>
2187     *   <li>{@code explicit_select} / {@code implicit_select_star} —
2188     *       {@code SENSITIVE_COLUMN_IN_OUTPUT} (existing, blueprint
2189     *       §S21).</li>
2190     *   <li>{@code window_partition} —
2191     *       {@code SENSITIVE_COLUMN_IN_WINDOW_PARTITION} (new,
2192     *       blueprint §1.5.B B5).</li>
2193     *   <li>{@code window_order} — no violation today. The blueprint
2194     *       does not pin a window-ORDER rule; PII in ORDER BY is a
2195     *       softer leak (rank reveals attribute order, not partition
2196     *       membership). Surface only as a column fact for now;
2197     *       a future {@code SENSITIVE_COLUMN_IN_WINDOW_ORDER} can ship
2198     *       once a seed needs it.</li>
2199     * </ul>
2200     */
2201    private void emitSensitiveViolation(SqlGuardResponse r, String exposure,
2202                                         String table, String name) {
2203        int idx = currentStatement(r);
2204        if ("window_partition".equals(exposure)) {
2205            addViolation(r, idx, "SENSITIVE_COLUMN_IN_WINDOW_PARTITION", "warn",
2206                    "Sensitive column appears in window PARTITION BY; "
2207                            + "small partitions may identify individuals.",
2208                    table + "." + name);
2209            return;
2210        }
2211        if ("window_order".equals(exposure)) {
2212            // Reserved exposure label — no violation today; see Javadoc.
2213            return;
2214        }
2215        // Default: explicit_select / implicit_select_star → in-output rule.
2216        addViolation(r, idx, "SENSITIVE_COLUMN_IN_OUTPUT", "warn",
2217                "Sensitive column appears in query output.",
2218                table + "." + name);
2219    }
2220
2221    private void mergeTags(List<String> existing, List<String> incoming) {
2222        for (String tag : incoming) {
2223            if (!existing.contains(tag)) {
2224                existing.add(tag);
2225            }
2226        }
2227    }
2228
2229    /**
2230     * Policy gate for {@code SENSITIVE_COLUMN_IN_OUTPUT}.
2231     *
2232     * <p>Triggers on {@code PII} or {@code RESTRICTED} only. {@code
2233     * FINANCIAL} is left out deliberately: aggregating a financial
2234     * column (e.g. {@code SUM(amount)}) hides individual values and
2235     * is structurally different from leaking SSN / PII. The catalog
2236     * still carries the {@code FINANCIAL} tag for downstream
2237     * lineage / audit consumers, and a future {@code info}-severity
2238     * tier (blueprint §3.3) is the right home for non-blocking
2239     * FINANCIAL exposure surfacing — at which point this rule can
2240     * fan out per severity rather than per tag.</p>
2241     */
2242    private boolean isSensitive(List<String> tags) {
2243        return tags.contains("PII") || tags.contains("RESTRICTED");
2244    }
2245
2246    private int currentStatement(SqlGuardResponse r) {
2247        return r.statements.isEmpty() ? 0 : r.statements.get(r.statements.size() - 1).index;
2248    }
2249
2250    private void addLineage(SqlGuardResponse r, int idx, String target, List<String> sources, String type) {
2251        addLineage(r, idx, target, sources, type, null, null);
2252    }
2253
2254    private void addLineage(SqlGuardResponse r, int idx, String target, List<String> sources, String type,
2255                            List<String> aggregateFunctions,
2256                            List<SqlGuardResponse.WindowFn> windowFunctions) {
2257        for (SqlGuardResponse.LineageEdge existing : r.facts.lineage) {
2258            if (existing.statementIndex == idx
2259                    && existing.targetColumn.equals(target)
2260                    && existing.sourceColumns.equals(sources)
2261                    && existing.lineageType.equals(type)
2262                    && listEquals(existing.aggregateFunctions, aggregateFunctions)
2263                    && windowListEquals(existing.windowFunctions, windowFunctions)) {
2264                return;
2265            }
2266        }
2267        SqlGuardResponse.LineageEdge e = new SqlGuardResponse.LineageEdge();
2268        e.statementIndex = idx;
2269        e.targetColumn = target;
2270        e.sourceColumns.addAll(sources);
2271        e.lineageType = type;
2272        e.aggregateFunctions = aggregateFunctions;
2273        e.windowFunctions = windowFunctions;
2274        r.facts.lineage.add(e);
2275    }
2276
2277    private static boolean listEquals(List<String> a, List<String> b) {
2278        return a == null ? b == null : a.equals(b);
2279    }
2280
2281    /**
2282     * Structural equality on {@code List<WindowFn>}. We don't make
2283     * {@code WindowFn} / {@code OrderByItem} implement {@code equals}
2284     * because the dedup compare is the only caller that needs it; an
2285     * implicit {@code equals} on a public wire-format POJO would invite
2286     * downstream consumers to rely on it.
2287     */
2288    private static boolean windowListEquals(List<SqlGuardResponse.WindowFn> a,
2289                                             List<SqlGuardResponse.WindowFn> b) {
2290        if (a == null) return b == null;
2291        if (b == null) return false;
2292        if (a.size() != b.size()) return false;
2293        for (int i = 0; i < a.size(); i++) {
2294            SqlGuardResponse.WindowFn x = a.get(i);
2295            SqlGuardResponse.WindowFn y = b.get(i);
2296            if (!eq(x.functionName, y.functionName)) return false;
2297            if (!eq(x.partitionBy, y.partitionBy)) return false;
2298            if (!eq(x.frame, y.frame)) return false;
2299            if (x.orderBy.size() != y.orderBy.size()) return false;
2300            for (int j = 0; j < x.orderBy.size(); j++) {
2301                SqlGuardResponse.OrderByItem ox = x.orderBy.get(j);
2302                SqlGuardResponse.OrderByItem oy = y.orderBy.get(j);
2303                if (!eq(ox.column, oy.column)) return false;
2304                if (!eq(ox.direction, oy.direction)) return false;
2305            }
2306        }
2307        return true;
2308    }
2309
2310    private static boolean eq(Object a, Object b) {
2311        return a == null ? b == null : a.equals(b);
2312    }
2313
2314    private String resultExpressionText(TResultColumn rc) {
2315        if (rc.getExpr() != null) {
2316            return rc.getExpr().toString().trim();
2317        }
2318        return rc.toString().trim();
2319    }
2320
2321    /**
2322     * Closed set of scalar aggregate function names this worker
2323     * recognizes. Lowercased so the case-insensitive comparison in the
2324     * visitor is a single set lookup. The list intentionally mirrors
2325     * the SQL-92 core aggregates ({@code COUNT}, {@code SUM},
2326     * {@code AVG}, {@code MIN}, {@code MAX}) — extending the set
2327     * (e.g. {@code STDDEV}, {@code VAR_POP}, {@code PERCENTILE_CONT},
2328     * vendor-specific {@code GROUP_CONCAT}) is deferred until a seed
2329     * needs them, so the schema for the extended shapes can be
2330     * designed against concrete SQL rather than speculatively.
2331     *
2332     * <p>Only consulted by the scalar {@link AggregateCallCollector};
2333     * {@link WindowCallCollector} (blueprint §B1/B2) accepts any
2334     * function call carrying a window spec, regardless of whether the
2335     * function appears in this set — {@code LAG}, {@code LEAD},
2336     * {@code ROW_NUMBER}, etc. are window-only and would never make
2337     * sense in {@code AGGREGATE_FUNCTIONS}.
2338     */
2339    private static final Set<String> AGGREGATE_FUNCTIONS;
2340    static {
2341        Set<String> s = new HashSet<String>();
2342        s.add("count");
2343        s.add("sum");
2344        s.add("avg");
2345        s.add("min");
2346        s.add("max");
2347        AGGREGATE_FUNCTIONS = Collections.unmodifiableSet(s);
2348    }
2349
2350    /**
2351     * Derive {@code lineageType} from the pre-computed aggregate-function
2352     * list. Aggregate edges are anything with at least one detected
2353     * function call; non-aggregate edges fall through to the
2354     * bare-identifier check that distinguishes {@code direct} from
2355     * {@code expression}.
2356     *
2357     * <p>The {@code direct} vs {@code expression} discriminator still
2358     * inspects the textual form because both shapes carry the same
2359     * lineage edge topology — telling them apart needs only "is this
2360     * the literal name of one column?", which a regex answers cleanly
2361     * without the SQL identifier extraction failure modes that motivate
2362     * the AST-walking aggregate detector. See
2363     * {@code sqlguard/CLAUDE.md} for the rule.
2364     */
2365    private String lineageType(String text, List<String> aggregateFunctions) {
2366        if (aggregateFunctions != null && !aggregateFunctions.isEmpty()) {
2367            return "aggregate";
2368        }
2369        String l = text.toLowerCase(Locale.ROOT);
2370        return l.matches(identifierPattern() + "(\\." + identifierPattern() + "){0,2}") ? "direct" : "expression";
2371    }
2372
2373    /**
2374     * Walk the AST under a result-column expression and collect every
2375     * aggregate function call, in document order, deduplicated. The
2376     * return shape is the same as the public {@code aggregateFunctions}
2377     * field on {@link SqlGuardResponse.LineageEdge}: {@code null} when
2378     * no aggregate call is detected (so the JSON envelope omits the
2379     * key on direct/expression edges) or a list of uppercase function
2380     * names — bare {@code "COUNT"} / {@code "AVG"} or
2381     * {@code "FUNC(DISTINCT)"} when the call uses the DISTINCT modifier.
2382     *
2383     * <p>Why AST instead of regex: SQL identifier extraction is the
2384     * parser's job, not the worker's. The parser has already resolved
2385     * whitespace ({@code COUNT (DISTINCT x)}), block comments
2386     * ({@code COUNT}/&#42;hint&#42;/{@code (DISTINCT x)}), single-quoted
2387     * literals (a literal {@code 'count(x)'} is a {@link TConstant},
2388     * not a {@link TFunctionCall}), and the DISTINCT modifier (exposed
2389     * as {@link EAggregateType}) by the time the AST exists. Walking
2390     * the AST trades one regex + one literal-stripping pattern +
2391     * special-case handling for one visitor pattern that is correct
2392     * by construction across every dialect GSP supports. See
2393     * {@code sqlguard/CLAUDE.md} "Identifier extraction rule" for the
2394     * project-wide rule.
2395     *
2396     * <p>Scope: scalar aggregates only. Window aggregates
2397     * ({@code AVG(x) OVER (...)}) live on the sibling
2398     * {@link #windowFunctionsFromAst} visitor (blueprint §1.5.B B1/B2,
2399     * shipped 2026-05-20); FILTER aggregates ({@code COUNT(x) FILTER
2400     * (WHERE ...)}) are still deferred to MantisBT #4468. We detect
2401     * those forms by peeking at
2402     * {@link TFunctionCall#getWindowSpecification()} /
2403     * {@link TFunctionCall#getWindowDef()} /
2404     * {@link TFunctionCall#getFilterClause()}; when any is non-null we
2405     * skip the node instead of misclassifying it as a scalar aggregate.
2406     *
2407     * <p>Blueprint §S2.
2408     */
2409    private List<String> aggregateFunctionsFromAst(TResultColumn rc) {
2410        if (rc == null || rc.getExpr() == null) {
2411            return null;
2412        }
2413        AggregateCallCollector collector = new AggregateCallCollector();
2414        rc.getExpr().acceptChildren(collector);
2415        return collector.functions.isEmpty() ? null : collector.functions;
2416    }
2417
2418    /**
2419     * Walk the AST under a result-column expression and collect every
2420     * window-function call (any {@link TFunctionCall} carrying a non-null
2421     * {@link TFunctionCall#getWindowDef()} or
2422     * {@link TFunctionCall#getWindowSpecification()}). Activated by
2423     * blueprint §1.5.B B1 / B2.
2424     *
2425     * <p>Returns {@code null} when no window call is found so the JSON
2426     * envelope can omit the {@code windowFunctions} key on non-window
2427     * edges (preserves wire-format compatibility — see
2428     * {@link SqlGuardResponse.LineageEdge#windowFunctions} Javadoc).
2429     *
2430     * <p>This is the L3 AST-fallback per
2431     * {@code sqlguard/CLAUDE.md} "Lineage extraction rule": dlineage
2432     * does not surface window-spec metadata as typed fields
2433     * (MantisBT #4467). When that lands, the visitor migrates to
2434     * {@code relationship.getWindowSpec()} without changing the wire
2435     * shape.
2436     */
2437    private List<SqlGuardResponse.WindowFn> windowFunctionsFromAst(TResultColumn rc) {
2438        if (rc == null || rc.getExpr() == null) {
2439            return null;
2440        }
2441        WindowCallCollector collector = new WindowCallCollector();
2442        rc.getExpr().acceptChildren(collector);
2443        return collector.functions.isEmpty() ? null : collector.functions;
2444    }
2445
2446    /**
2447     * AST visitor that collects window-function calls and their
2448     * partition / order / frame metadata. Mirrors
2449     * {@link AggregateCallCollector}'s shape but inspects the window
2450     * portion of {@link TFunctionCall} instead of the aggregate
2451     * modifier.
2452     *
2453     * <p>Recursion: {@code acceptChildren} on the root expression
2454     * descends into nested arguments, so a window call inside e.g.
2455     * {@code COALESCE(SUM(x) OVER (...), 0)} or
2456     * {@code CASE WHEN LAG(x) OVER (...) = y THEN ... END} still
2457     * surfaces.
2458     *
2459     * <p><b>Named-window references (e.g. {@code OVER w} with
2460     * {@code WINDOW w AS (PARTITION BY ...)})</b> are partially
2461     * supported: the visitor produces a {@code WindowFn} (so
2462     * {@code lineageType="window"} still fires) but with empty
2463     * {@code partitionBy} / {@code orderBy} and {@code frame=null}
2464     * because those clauses live on the SELECT's {@code WINDOW} clause
2465     * definition, not on the inline {@code OVER} reference. Resolving
2466     * the reference requires walking
2467     * {@link TSelectSqlStatement#getWindowClause()} which is out of
2468     * scope for this pass; flagged for follow-up if a seed needs it.
2469     *
2470     * <p><b>Partition / order expression text</b> is captured via
2471     * {@code TExpression.toString()} — so {@code PARTITION BY date_col
2472     * + 1} produces {@code partitionBy=["date_col + 1"]} rather than
2473     * extracting the {@code TObjectName} children. This matches the
2474     * field's contract ("raw column expressions as written"). The
2475     * catalog-bound source columns for that expression still flow
2476     * through dlineage's {@code fdr} edges into
2477     * {@link SqlGuardResponse.LineageEdge#sourceColumns}.
2478     */
2479    private static final class WindowCallCollector extends TParseTreeVisitor {
2480        final List<SqlGuardResponse.WindowFn> functions = new ArrayList<SqlGuardResponse.WindowFn>();
2481
2482        @Override
2483        public void preVisit(TFunctionCall node) {
2484            if (node == null || node.getFunctionName() == null) {
2485                return;
2486            }
2487            TWindowDef wdef = node.getWindowDef();
2488            TWindowSpecification wspec = node.getWindowSpecification();
2489            if (wdef == null && wspec == null) {
2490                return;
2491            }
2492            String raw = node.getFunctionName().toString();
2493            if (raw == null) {
2494                return;
2495            }
2496            String upper = SqlGuardCatalogSamples.strip(raw).toUpperCase(Locale.ROOT);
2497            EAggregateType agg = node.getAggregateType();
2498            String entry = agg == EAggregateType.distinct ? upper + "(DISTINCT)" : upper;
2499
2500            SqlGuardResponse.WindowFn wf = new SqlGuardResponse.WindowFn();
2501            wf.functionName = entry;
2502
2503            TPartitionClause part = wdef != null ? wdef.getPartitionClause()
2504                    : wspec.getPartitionClause();
2505            if (part != null && part.getExpressionList() != null) {
2506                TExpressionList el = part.getExpressionList();
2507                for (int i = 0; i < el.size(); i++) {
2508                    TExpression e = el.getExpression(i);
2509                    if (e == null) continue;
2510                    String t = e.toString();
2511                    if (t != null && t.length() > 0) {
2512                        wf.partitionBy.add(t.toLowerCase(Locale.ROOT));
2513                    }
2514                }
2515            }
2516
2517            TOrderBy ob = wdef != null ? wdef.getOrderBy() : wspec.getOrderBy();
2518            if (ob != null && ob.getItems() != null) {
2519                for (int i = 0; i < ob.getItems().size(); i++) {
2520                    TOrderByItem item = ob.getItems().getOrderByItem(i);
2521                    if (item == null || item.getSortKey() == null) continue;
2522                    SqlGuardResponse.OrderByItem out = new SqlGuardResponse.OrderByItem();
2523                    out.column = item.getSortKey().toString().toLowerCase(Locale.ROOT);
2524                    ESortType sort = item.getSortOrder();
2525                    if (sort == ESortType.asc) {
2526                        out.direction = "ASC";
2527                    } else if (sort == ESortType.desc) {
2528                        out.direction = "DESC";
2529                    } else {
2530                        out.direction = "NONE";
2531                    }
2532                    wf.orderBy.add(out);
2533                }
2534            }
2535
2536            TWindowFrame frame = wdef != null ? wdef.getWindowFrame()
2537                    : wspec.getWindowFrame();
2538            if (frame != null) {
2539                String ft = frame.toString();
2540                if (ft != null && !ft.trim().isEmpty()) {
2541                    wf.frame = ft.trim();
2542                }
2543            }
2544
2545            functions.add(wf);
2546        }
2547    }
2548
2549    /**
2550     * AST visitor that collects scalar aggregate calls. Window / FILTER
2551     * forms are intentionally skipped (see Javadoc on
2552     * {@link #aggregateFunctionsFromAst}). The visitor uses
2553     * {@code preVisit(TFunctionCall)} which is the standard hook from
2554     * {@link TParseTreeVisitor}; recursion through nested arguments
2555     * happens via {@code acceptChildren} on the root expression, so a
2556     * call inside {@code COALESCE(SUM(a), AVG(b))} surfaces every
2557     * aggregate without manual traversal.
2558     */
2559    private static final class AggregateCallCollector extends TParseTreeVisitor {
2560        final List<String> functions = new ArrayList<String>();
2561
2562        @Override
2563        public void preVisit(TFunctionCall node) {
2564            if (node == null || node.getFunctionName() == null) {
2565                return;
2566            }
2567            // Skip window aggregates (OVER (...)) and FILTER (WHERE …)
2568            // forms — they're separate lineage shapes owned by Tier 2.
2569            if (node.getWindowSpecification() != null || node.getWindowDef() != null
2570                    || node.getFilterClause() != null) {
2571                return;
2572            }
2573            String raw = node.getFunctionName().toString();
2574            if (raw == null) {
2575                return;
2576            }
2577            // Closed-set membership against well-known SQL aggregate
2578            // function names. NOT an SQL identifier comparison in the
2579            // sqlguard/CLAUDE.md "Identifier comparison rule" sense:
2580            // COUNT/SUM/AVG/MIN/MAX are built-in function names whose
2581            // identity is case-insensitive in every supported dialect.
2582            // Plain .toLowerCase() is correct and equivalent to
2583            // areEqualStatic for any vendor on these specific tokens.
2584            String name = SqlGuardCatalogSamples.strip(raw).toLowerCase(Locale.ROOT);
2585            if (!AGGREGATE_FUNCTIONS.contains(name)) {
2586                return;
2587            }
2588            String upper = name.toUpperCase(Locale.ROOT);
2589            // EAggregateType: none, all, distinct, unique. `all` and
2590            // `unique` fold into the bare form — `ALL` is SQL's default
2591            // and `UNIQUE` is an Oracle synonym for DISTINCT in the
2592            // grammar but isn't semantically meaningful enough to be
2593            // worth a third surface form in the worker contract.
2594            EAggregateType agg = node.getAggregateType();
2595            String entry = agg == EAggregateType.distinct ? upper + "(DISTINCT)" : upper;
2596            if (!functions.contains(entry)) {
2597                functions.add(entry);
2598            }
2599        }
2600    }
2601
2602    private String identifierPattern() {
2603        return "([a-zA-Z_][\\w]*|\\\"[^\\\"]+\\\"|`[^`]+`)";
2604    }
2605
2606    /**
2607     * Last dot-segment of a table reference, with quoting stripped.
2608     * Used for the wire-form short name on {@code r.facts.tables[].name}
2609     * and {@code scope.tableNames} — both of which intentionally drop
2610     * the schema/catalog qualifier. Callers that need the qualifier
2611     * (catalog lookup) should use {@link #qualifiedTableRef} instead so
2612     * {@link SqlGuardCatalogSamples#findTable} can match against the
2613     * schema name.
2614     */
2615    private String simple(String s) {
2616        String qualified = qualifiedTableRef(s);
2617        if (qualified.isEmpty()) {
2618            return "";
2619        }
2620        if (qualified.indexOf('.') >= 0) {
2621            qualified = qualified.substring(qualified.lastIndexOf('.') + 1);
2622        }
2623        return SqlGuardCatalogSamples.strip(qualified);
2624    }
2625
2626    /**
2627     * Cleaned but still-qualified table reference (e.g.
2628     * {@code "public.sbcustomer"}). Strips outer quoting, trailing
2629     * punctuation, and any subquery / aliased fragments while preserving
2630     * the schema/catalog segments so
2631     * {@link SqlGuardCatalogSamples#findTable} can resolve the right
2632     * schema instead of guessing.
2633     */
2634    private String qualifiedTableRef(String s) {
2635        s = SqlGuardCatalogSamples.strip(s);
2636        if (s == null) {
2637            return "";
2638        }
2639        s = s.trim().replaceAll("[;,]$", "");
2640        if (s.indexOf('(') >= 0) {
2641            return "";
2642        }
2643        if (s.indexOf(' ') >= 0) {
2644            s = s.substring(0, s.indexOf(' '));
2645        }
2646        return s;
2647    }
2648
2649    private void addViolation(SqlGuardResponse r, int idx, String code, String sev, String msg, String ev) {
2650        for (SqlGuardResponse.Violation v : r.violations) {
2651            if (v.statementIndex == idx && v.code.equals(code) && String.valueOf(v.evidence).equals(ev)) {
2652                return;
2653            }
2654        }
2655        SqlGuardResponse.Violation v = new SqlGuardResponse.Violation();
2656        v.statementIndex = idx;
2657        v.code = code;
2658        v.severity = sev;
2659        v.message = msg;
2660        v.evidence = ev;
2661        r.violations.add(v);
2662        if (code.equals("SELECT_STAR_ON_SENSITIVE_TABLE")) {
2663            r.suggestions.add("Replace SELECT * with explicit non-sensitive columns.");
2664        }
2665    }
2666
2667    private void applyStatementDecision(SqlGuardResponse.StatementResult sr, List<SqlGuardResponse.Violation> vs, int idx) {
2668        int maxRisk = 0;
2669        for (SqlGuardResponse.Violation v : vs) {
2670            if (v.statementIndex == idx && "error".equals(v.severity)) {
2671                sr.decision = "DENY";
2672                sr.riskScore = 90;
2673                sr.summary = "SQL denied by policy.";
2674                return;
2675            }
2676            if (v.statementIndex == idx) {
2677                maxRisk = Math.max(maxRisk, "warn".equals(v.severity) ? 60 : 30);
2678            }
2679        }
2680        if (maxRisk > 0) {
2681            sr.decision = "WARN";
2682            sr.riskScore = maxRisk;
2683            sr.summary = "SQL has policy warnings.";
2684        }
2685    }
2686
2687    private void merge(SqlGuardResponse r) {
2688        for (SqlGuardResponse.StatementResult s : r.statements) {
2689            r.riskScore = Math.max(r.riskScore, s.riskScore);
2690            if ("DENY".equals(s.decision)) {
2691                r.decision = "DENY";
2692                r.summary = "One or more statements are denied.";
2693            } else if (!"DENY".equals(r.decision) && "WARN".equals(s.decision)) {
2694                r.decision = "WARN";
2695                r.summary = "One or more statements have warnings.";
2696            }
2697        }
2698    }
2699}