001package gudusoft.gsqlparser.catalog.input.readers;
002
003import gudusoft.gsqlparser.EDbVendor;
004import gudusoft.gsqlparser.catalog.input.CatalogInputException;
005import gudusoft.gsqlparser.catalog.input.CatalogInputKind;
006import gudusoft.gsqlparser.catalog.input.CatalogInputReader;
007import gudusoft.gsqlparser.catalog.input.CatalogInputReaderFactory;
008import gudusoft.gsqlparser.catalog.input.CatalogInputSource;
009import gudusoft.gsqlparser.catalog.input.CatalogLoadOptions;
010import gudusoft.gsqlparser.catalog.input.model.CatalogModel;
011import gudusoft.gsqlparser.catalog.input.model.CatalogSourceInfo;
012import gudusoft.gsqlparser.catalog.input.model.ColumnModel;
013import gudusoft.gsqlparser.catalog.input.model.DefaultsConfig;
014import gudusoft.gsqlparser.catalog.input.model.RoutineModel;
015import gudusoft.gsqlparser.catalog.input.model.SchemaModel;
016import gudusoft.gsqlparser.catalog.input.model.TableModel;
017import gudusoft.gsqlparser.catalog.input.model.UnifiedCatalogModel;
018import gudusoft.gsqlparser.catalog.input.model.ViewModel;
019import gudusoft.gsqlparser.catalog.runtime.CatalogObjectKind;
020import gudusoft.gsqlparser.util.json.JSON;
021
022import java.io.BufferedReader;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.InputStreamReader;
026import java.io.Reader;
027import java.nio.charset.StandardCharsets;
028import java.nio.file.Files;
029import java.util.List;
030import java.util.Map;
031
032/**
033 * Reader for {@link CatalogInputKind#SQLFLOW_JSON} sources — the JSON shape the legacy
034 * {@code SqlflowSQLEnv} consumes (plan §11 T2.A.3 / second Phase 2.A static-file adapter).
035 *
036 * <p>Per plan §6 / §11.2 this reader funnels the SqlFlow export shape into the same
037 * {@link UnifiedCatalogModel} the JSON-manifest and SQLDep readers use; the eager bridge
038 * then materializes a {@code TSQLEnv} byte-for-byte equivalent (for the SELECT smoke
039 * surface) to what the legacy {@code SqlflowSQLEnv} would build (parity test in
040 * {@code SqlflowLegacyParityTest}). The legacy path remains untouched per plan §8.5.</p>
041 *
042 * <h3>SqlFlow export schema</h3>
043 *
044 * <p>The root JSON is the SqlFlow "server" object. Three layouts depending on dialect:</p>
045 *
046 * <h4>Layout A — catalogs and schemas (Oracle / MSSQL / PostgreSQL / BigQuery / …)</h4>
047 * <pre>{@code
048 * {
049 *   "name": "<server label>",
050 *   "dbVendor": "dbvoracle",                  // optional vendor override (informational)
051 *   "databases": [
052 *     {
053 *       "name": "ORCL",
054 *       "schemas": [
055 *         {
056 *           "name": "HR",
057 *           "tables":     [{"name": "EMPLOYEES", "type": "TABLE",
058 *                            "columns": [{"name": "EMPLOYEE_ID"}, ...]}, ...],
059 *           "views":      [{"name": "EMP_DETAILS_VIEW", "type": "VIEW",
060 *                            "columns": [...]}, ...],
061 *           "packages":   [{"name": "HR_PKG",
062 *                            "procedures": [...], "functions": [...], "triggers": [...]}],
063 *           "procedures": [{"name": "ADD_EMP"}],
064 *           "functions":  [{"name": "GET_SAL"}],
065 *           "triggers":   [{"name": "AUDIT_TRG"}]
066 *         }
067 *       ]
068 *     }
069 *   ]
070 * }
071 * }</pre>
072 *
073 * <h4>Layout B — catalogs only (MySQL / Teradata / Hive / Impala — {@code supportSchema=false})</h4>
074 * <pre>{@code
075 * {
076 *   "databases": [
077 *     {
078 *       "name": "mydb",
079 *       "tables":   [...],
080 *       "views":    [...],
081 *       "packages": [...],
082 *       "procedures": [...],
083 *       "functions": [...],
084 *       "triggers": [...]
085 *     }
086 *   ]
087 * }
088 * }</pre>
089 *
090 * <h4>Layout C — schemas only (no catalogs)</h4>
091 *
092 * <p>The legacy {@code SqlflowSQLEnv} guards Layout C with {@code supportCatalog(vendor)},
093 * which currently returns {@code true} for every dialect. This branch is therefore dead
094 * in production, but a SqlFlow export pinned to that shape is still tolerated here for
095 * defensive reasons — the model is built under the {@code TSQLEnv.DEFAULT_DB_NAME}
096 * catalog the way the legacy code would.</p>
097 *
098 * <h3>Mapping rules</h3>
099 *
100 * <ul>
101 *   <li>{@code tables[]} and {@code views[]} are walked in <strong>insertion order</strong>
102 *       (tables first, views second), matching the legacy {@code appendTables} method.
103 *       For each entry the {@code type} field is examined: if it contains the substring
104 *       {@code "view"} (case-insensitive ASCII) the entry is recorded as a
105 *       {@link ViewModel}; otherwise as a {@link TableModel}. Array membership is
106 *       informational only — the {@code type} field wins.</li>
107 *   <li>{@code procedures[]} → {@link RoutineModel} with {@link CatalogObjectKind#PROCEDURE}.</li>
108 *   <li>{@code functions[]} → {@link RoutineModel} with {@link CatalogObjectKind#FUNCTION}.</li>
109 *   <li>{@code packages[]} → {@link RoutineModel} with {@link CatalogObjectKind#PACKAGE}.
110 *       The package's nested {@code procedures[]} / {@code functions[]} / {@code triggers[]}
111 *       have no Phase 1 SPI representation and are dropped (the legacy loader binds them
112 *       inside the {@code TSQLEnv}'s package node — that nesting belongs in a follow-up
113 *       extension to {@link RoutineModel}). The empty package shell is still present so
114 *       callers that resolve {@code SCHEMA.PKG} succeed.</li>
115 *   <li>{@code triggers[]} (top-level or package-nested) — there is no Phase 1
116 *       {@code RoutineModel.kind} mapping for triggers and the {@code SQLEnvCatalogLoader}
117 *       does not synthesize them. They are dropped silently. SELECT-only smoke tests are
118 *       unaffected (triggers do not participate in {@code TSQLResolver2} formatter
119 *       output for {@code SELECT}/{@code JOIN} queries); future Phase 2 work that needs
120 *       trigger metadata should extend the SPI.</li>
121 *   <li>The {@code dbVendor} JSON field is informational. The reader honors
122 *       {@link CatalogLoadOptions#vendor()} as the single source of truth (defaulting to
123 *       {@link EDbVendor#dbvoracle} when the option is unset, matching SqlFlow's historical
124 *       Oracle-leaning default). A mismatch between the JSON's {@code dbVendor} and the
125 *       option's vendor is not flagged here — {@code CatalogModelValidator} catches the
126 *       harmful identifier-bypass cases through its normalize round-trip.</li>
127 * </ul>
128 *
129 * <h3>Dispatch</h3>
130 *
131 * <p>The reader is dispatch-by-{@link CatalogInputKind#SQLFLOW_JSON} only — callers must
132 * tag the source explicitly. There is no auto-claim heuristic on JSON structure (the
133 * SqlFlow shape and the SQLDep shape both top-level objects with a {@code databases}
134 * array would collide); the explicit kind tag is the contract.</p>
135 *
136 * <h3>Dotted-identifier defense</h3>
137 *
138 * <p>Identifier names with embedded dots are wrapped with the vendor's quote characters
139 * before storage. Without the wrap, {@code ModelBackedCatalogProvider} later concatenates
140 * segments with {@code .} into qualified strings (e.g. {@code catalog.schema.A.B}) that
141 * {@code CatalogIdentifierPolicy.parse} would split into four segments instead of three,
142 * and the runtime would never resolve the SQL reference {@code "A.B"}. Mirrors the
143 * {@code SQLDepSQLEnv} defense and the same wrapping policy as {@link SqldepCatalogInputReader}.
144 * The vendor → delimiter-pair table is duplicated from {@code TSQLEnv.delimitedChar} to
145 * keep {@code catalog/**} free of a compile-time dependency on the {@code sqlenv} runtime
146 * layer.</p>
147 */
148public final class SqlflowCatalogInputReader implements CatalogInputReader {
149
150    public SqlflowCatalogInputReader() {
151    }
152
153    @Override
154    public CatalogInputKind kind() {
155        return CatalogInputKind.SQLFLOW_JSON;
156    }
157
158    @Override
159    public boolean supports(CatalogInputSource source, CatalogLoadOptions options) {
160        if (source == null || source.inMemoryModel() != null) {
161            return false;
162        }
163        return source.declaredKind() == CatalogInputKind.SQLFLOW_JSON;
164    }
165
166    @Override
167    public UnifiedCatalogModel read(CatalogInputSource source, CatalogLoadOptions options)
168            throws CatalogInputException {
169        if (source == null) {
170            throw new CatalogInputException("SqlflowCatalogInputReader: source is required");
171        }
172        final EDbVendor vendor = (options != null && options.vendor() != null)
173            ? options.vendor()
174            : EDbVendor.dbvoracle;  // SqlFlow exports historically default to Oracle.
175
176        long start = System.currentTimeMillis();
177        String text = readAll(source);
178        Object parsed;
179        try {
180            parsed = JSON.parseObject(text);
181        } catch (RuntimeException ex) {
182            throw new CatalogInputException(
183                "Failed to parse SqlFlow JSON from " + source.name() + ": " + ex.getMessage(),
184                ex);
185        }
186        if (!(parsed instanceof Map)) {
187            throw new CatalogInputException(
188                "SqlFlow root must be an object (got " + typeOf(parsed) + ")");
189        }
190        @SuppressWarnings("unchecked")
191        Map<Object, Object> root = (Map<Object, Object>) parsed;
192
193        try {
194            UnifiedCatalogModel.Builder mb = UnifiedCatalogModel.builder().vendor(vendor);
195            mb.sourceInfo(buildSourceInfo(source, start));
196
197            // Track defaults in one place so we can layer (options → JSON server name)
198            // without reading the model builder back. Writes to mb.defaults(...) happen
199            // exactly once at the end.
200            DefaultsConfig.Builder defaultsBuilder = DefaultsConfig.builder();
201            boolean anyDefault = seedOptionDefaults(defaultsBuilder, options);
202            boolean serverNameSeeded = options != null && options.defaultServer() != null
203                && !options.defaultServer().isEmpty();
204
205            // The canonical SqlFlow ingester export wraps one or more server objects in
206            // a {createdBy, servers:[…]} envelope (see TJSONSQLEnvParser.getJSONSQLEnv).
207            // Each entry of `servers[]` is exactly the shape SqlflowSQLEnv directly
208            // accepts. When the envelope is present, walk every server and merge its
209            // catalogs into a single UnifiedCatalogModel — the legacy parser produces
210            // one TSQLEnv per server, but the SPI's single-model contract folds them
211            // together. The bare server-object-as-root form (what SqlflowSQLEnv was
212            // historically constructed with directly) is also tolerated so callers that
213            // already unwrapped the envelope themselves keep working.
214            //
215            // Catalog-name dedup (codex P2 round 2): two servers that ship a database
216            // with the same name produce duplicate top-level CatalogModels, which the
217            // CatalogModelValidator flags as an ERROR. Track seen names and reject the
218            // ambiguous case explicitly with a clear message — the legacy parser builds
219            // one TSQLEnv per server so the conflict is naturally avoided; here it must
220            // be surfaced for the single-model contract.
221            java.util.HashSet<String> catalogNames = new java.util.HashSet<String>();
222            Object servers = root.get("servers");
223            if (servers != null) {
224                if (!(servers instanceof List)) {
225                    throw new CatalogInputException(
226                        "SqlFlow field 'servers' must be an array (got "
227                            + typeOf(servers) + ")");
228                }
229                for (Object sobj : (List<?>) servers) {
230                    if (!(sobj instanceof Map)) {
231                        throw new CatalogInputException(
232                            "SqlFlow servers[i] must be an object (got "
233                                + typeOf(sobj) + ")");
234                    }
235                    @SuppressWarnings("unchecked")
236                    Map<Object, Object> server = (Map<Object, Object>) sobj;
237                    // Codex P2: preserve the server's `name` as DefaultsConfig.defaultServer
238                    // when the caller did not supply one, mirroring the legacy
239                    // SqlflowSQLEnv.initSQLEnv() call to setDefaultServerName(server.get("name")).
240                    // First server wins to avoid clobbering when multiple servers ship in
241                    // one envelope (the legacy code makes one TSQLEnv per server, so the
242                    // analogue here is "first server's name represents this model").
243                    if (!serverNameSeeded) {
244                        String serverName = asString(server, "name", false);
245                        if (serverName != null && !serverName.isEmpty()) {
246                            defaultsBuilder.defaultServer(serverName);
247                            anyDefault = true;
248                            serverNameSeeded = true;
249                        }
250                    }
251                    // Codex P2 round 2: layout decision is per-server. The legacy parser
252                    // constructs each SqlflowSQLEnv with the server's own dbVendor; if the
253                    // JSON has dbVendor=dbvmysql but options.vendor=dbvoracle, the legacy
254                    // walks tables from databases[] directly while our previous single
255                    // precomputed layout would look for schemas[] and silently drop tables.
256                    // Honor the per-server vendor for supportsSchema only — the model's own
257                    // vendor stays options.vendor (the SPI single-model contract precludes
258                    // mixed-vendor models).
259                    EDbVendor serverVendor = readServerVendor(server, vendor);
260                    boolean serverSupportsSchema = supportSchema(serverVendor);
261                    parseServerBody(server, vendor, serverSupportsSchema, mb, catalogNames);
262                }
263            } else {
264                // No envelope — root IS the server body (legacy direct-construction shape).
265                // Try the same bare-root server-name capture as the envelope path so a
266                // SqlFlow JSON that was hand-edited down to a single server also preserves
267                // its `name` field.
268                if (!serverNameSeeded) {
269                    String serverName = asString(root, "name", false);
270                    if (serverName != null && !serverName.isEmpty()) {
271                        defaultsBuilder.defaultServer(serverName);
272                        anyDefault = true;
273                    }
274                }
275                EDbVendor serverVendor = readServerVendor(root, vendor);
276                boolean serverSupportsSchema = supportSchema(serverVendor);
277                parseServerBody(root, vendor, serverSupportsSchema, mb, catalogNames);
278            }
279
280            if (anyDefault) {
281                mb.defaults(defaultsBuilder.build());
282            }
283            return mb.build();
284        } catch (IllegalArgumentException ex) {
285            // Model builders enforce structural invariants (non-empty names, etc.) by
286            // throwing IllegalArgumentException. Surface those through the reader's
287            // checked-exception channel so callers using try/catch on
288            // CatalogInputException don't get caught off guard. Same posture as the
289            // SQLDep reader.
290            throw new CatalogInputException(
291                "Malformed SqlFlow export from " + source.name() + ": " + ex.getMessage(), ex);
292        }
293    }
294
295    /**
296     * Walk one SqlFlow server body — the JSON object that {@code SqlflowSQLEnv} would be
297     * constructed with directly. Same shape regardless of whether it came from the outer
298     * {@code servers[]} envelope or from a bare server-as-root JSON.
299     *
300     * <p>{@code seenCatalogNames} is the running set across server walks so a multi-server
301     * envelope can reject the ambiguous case (two servers ship a database with the same
302     * name) with a clear {@link CatalogInputException}, rather than producing a model
303     * that fails the validator's duplicate-catalog ERROR check downstream. Pass an empty
304     * set when called for a single-server body (validator still catches in-server
305     * duplicates).</p>
306     */
307    private void parseServerBody(Map<Object, Object> body, EDbVendor vendor,
308                                 boolean supportsSchema, UnifiedCatalogModel.Builder mb,
309                                 java.util.Set<String> seenCatalogNames)
310            throws CatalogInputException {
311        Object databases = body.get("databases");
312        if (databases != null) {
313            if (!(databases instanceof List)) {
314                throw new CatalogInputException(
315                    "SqlFlow field 'databases' must be an array (got "
316                        + typeOf(databases) + ")");
317            }
318            for (Object dobj : (List<?>) databases) {
319                if (!(dobj instanceof Map)) {
320                    throw new CatalogInputException(
321                        "SqlFlow databases[i] must be an object (got "
322                            + typeOf(dobj) + ")");
323                }
324                @SuppressWarnings("unchecked")
325                Map<Object, Object> dmap = (Map<Object, Object>) dobj;
326                CatalogModel catalog = parseDatabase(dmap, vendor, supportsSchema);
327                if (catalog != null) {
328                    if (!seenCatalogNames.add(catalog.name())) {
329                        throw new CatalogInputException(
330                            "SqlFlow envelope contains duplicate catalog name '"
331                                + catalog.name() + "' across servers; the SPI's single-model "
332                                + "contract cannot fold ambiguous catalogs. Split the export "
333                                + "into one file per server, or rename the duplicate "
334                                + "database before reading.");
335                    }
336                    mb.addCatalog(catalog);
337                }
338            }
339            return;
340        }
341        // Layout C — schemas-at-top. Defensive only; supportCatalog(vendor) returns
342        // true for every dialect today, so this branch is unreachable in production
343        // but the legacy SqlflowSQLEnv carries it.
344        Object schemas = body.get("schemas");
345        if (schemas != null) {
346            if (!(schemas instanceof List)) {
347                throw new CatalogInputException(
348                    "SqlFlow field 'schemas' must be an array (got "
349                        + typeOf(schemas) + ")");
350            }
351            CatalogModel catalog = parseSchemasAtTop((List<?>) schemas, vendor);
352            if (catalog != null) {
353                if (!seenCatalogNames.add(catalog.name())) {
354                    throw new CatalogInputException(
355                        "SqlFlow envelope contains duplicate catalog name '"
356                            + catalog.name() + "' across servers");
357                }
358                mb.addCatalog(catalog);
359            }
360        }
361    }
362
363    /**
364     * Resolve the per-server vendor used for the layout decision. The legacy parser
365     * constructs each {@code SqlflowSQLEnv} with the server's own {@code dbVendor} when
366     * present; we honor that for {@link #supportSchema} so a MySQL/Hive server in an
367     * envelope routed under an Oracle/PostgreSQL options.vendor doesn't silently drop
368     * tables that hang directly off {@code databases[i]}. Returns the supplied
369     * {@code defaultVendor} when the JSON omits or supplies an unparsable {@code dbVendor}.
370     */
371    private static EDbVendor readServerVendor(Map<Object, Object> server,
372                                              EDbVendor defaultVendor) {
373        Object v = server.get("dbVendor");
374        if (!(v instanceof String)) {
375            return defaultVendor;
376        }
377        String s = ((String) v).trim();
378        if (s.isEmpty()) {
379            return defaultVendor;
380        }
381        try {
382            return EDbVendor.valueOf(s);
383        } catch (IllegalArgumentException ex) {
384            // Unrecognized vendor name in the JSON — fall back to the option vendor;
385            // CatalogModelValidator's identifier-bypass check will surface any
386            // identifier-fold mismatch downstream. Don't throw: the legacy parser
387            // would have done the same EDbVendor.valueOf() and let the caller see the
388            // resulting IllegalArgumentException; we choose tolerance over failure.
389            return defaultVendor;
390        }
391    }
392
393
394    // ---------- catalog / schema / table parsing ----------
395
396    /**
397     * Walk one {@code databases[i]} entry into a {@link CatalogModel}. Two sub-shapes:
398     *
399     * <ul>
400     *   <li>{@code supportsSchema}: {@code databases[i].schemas[j]} arrays carry the
401     *       per-schema object listings.</li>
402     *   <li>{@code !supportsSchema}: object listings (tables / views / etc.) hang off
403     *       {@code databases[i]} directly; one synthetic schema named
404     *       {@link gudusoft.gsqlparser.sqlenv.TSQLEnv#DEFAULT_SCHEMA_NAME} is produced so
405     *       the loader's catalog-tree walk has somewhere to plant them. The legacy
406     *       {@code SqlflowSQLEnv} does the same routing in
407     *       {@code sqlCatalog.getSchema(TSQLEnv.DEFAULT_SCHEMA_NAME, true)}.</li>
408     * </ul>
409     */
410    private CatalogModel parseDatabase(Map<Object, Object> dmap, EDbVendor vendor,
411                                       boolean supportsSchema)
412            throws CatalogInputException {
413        String dbName = wrapDottedName(asString(dmap, "name", true), vendor);
414        CatalogModel.Builder cb = CatalogModel.builder().name(dbName);
415        if (supportsSchema) {
416            Object schemas = dmap.get("schemas");
417            if (schemas != null) {
418                if (!(schemas instanceof List)) {
419                    throw new CatalogInputException(
420                        "SqlFlow database '" + dbName + "' field 'schemas' must be an array (got "
421                            + typeOf(schemas) + ")");
422                }
423                for (Object sobj : (List<?>) schemas) {
424                    if (!(sobj instanceof Map)) {
425                        throw new CatalogInputException(
426                            "SqlFlow database '" + dbName + "' schemas[i] must be an object (got "
427                                + typeOf(sobj) + ")");
428                    }
429                    @SuppressWarnings("unchecked")
430                    Map<Object, Object> smap = (Map<Object, Object>) sobj;
431                    String schemaName = wrapDottedName(asString(smap, "name", true), vendor);
432                    cb.addSchema(parseSchemaBody(smap, schemaName, vendor));
433                }
434            }
435        } else {
436            // Catalogs-only dialect: synthesize one default-named schema bucket.
437            cb.addSchema(parseSchemaBody(dmap, "", vendor));
438        }
439        return cb.build();
440    }
441
442    /**
443     * Defensive Layout-C fallback. Builds a single synthetic catalog whose name is
444     * {@code TSQLEnv.DEFAULT_DB_NAME} ({@code "$_-_$"}) — matching the legacy
445     * {@code sqlCatalog = getSQLCatalog(TSQLEnv.DEFAULT_DB_NAME, true)} call. Each
446     * top-level schema becomes a {@link SchemaModel} under it.
447     */
448    private CatalogModel parseSchemasAtTop(List<?> schemas, EDbVendor vendor)
449            throws CatalogInputException {
450        CatalogModel.Builder cb = CatalogModel.builder()
451            .name(gudusoft.gsqlparser.sqlenv.TSQLEnv.DEFAULT_DB_NAME);
452        for (Object sobj : schemas) {
453            if (!(sobj instanceof Map)) {
454                throw new CatalogInputException(
455                    "SqlFlow top-level schemas[i] must be an object (got " + typeOf(sobj) + ")");
456            }
457            @SuppressWarnings("unchecked")
458            Map<Object, Object> smap = (Map<Object, Object>) sobj;
459            String schemaName = wrapDottedName(asString(smap, "name", true), vendor);
460            cb.addSchema(parseSchemaBody(smap, schemaName, vendor));
461        }
462        return cb.build();
463    }
464
465    /**
466     * Walk a schema-body map (either a {@code schemas[i]} entry, or a {@code databases[i]}
467     * entry in the schema-less case) into a {@link SchemaModel}. Handles tables, views,
468     * top-level packages, top-level procedures/functions, and silently skips triggers and
469     * package-nested children (see class Javadoc for the rationale).
470     *
471     * <p>The {@code schemaName} is wrapped upstream so the dotted-identifier defense covers
472     * the schema layer too.</p>
473     */
474    private SchemaModel parseSchemaBody(Map<Object, Object> body, String schemaName,
475                                        EDbVendor vendor)
476            throws CatalogInputException {
477        SchemaModel.Builder sb = SchemaModel.builder().name(schemaName);
478
479        // Tables and views are walked in legacy order: tables first, then views. The
480        // SqlflowSQLEnv code combines them into a single list via addAll(), so the
481        // produced TSQLEnv ordering is tables-then-views — preserve that here so the
482        // parity test sees the same insertion order.
483        Object tables = body.get("tables");
484        if (tables != null) {
485            if (!(tables instanceof List)) {
486                throw new CatalogInputException(
487                    "SqlFlow field 'tables' must be an array (got " + typeOf(tables) + ")");
488            }
489            for (Object tobj : (List<?>) tables) {
490                if (!(tobj instanceof Map)) {
491                    throw new CatalogInputException(
492                        "SqlFlow tables[i] must be an object (got " + typeOf(tobj) + ")");
493                }
494                @SuppressWarnings("unchecked")
495                Map<Object, Object> tmap = (Map<Object, Object>) tobj;
496                addTableOrView(sb, tmap, vendor);
497            }
498        }
499        Object views = body.get("views");
500        if (views != null) {
501            if (!(views instanceof List)) {
502                throw new CatalogInputException(
503                    "SqlFlow field 'views' must be an array (got " + typeOf(views) + ")");
504            }
505            for (Object vobj : (List<?>) views) {
506                if (!(vobj instanceof Map)) {
507                    throw new CatalogInputException(
508                        "SqlFlow views[i] must be an object (got " + typeOf(vobj) + ")");
509                }
510                @SuppressWarnings("unchecked")
511                Map<Object, Object> vmap = (Map<Object, Object>) vobj;
512                addTableOrView(sb, vmap, vendor);
513            }
514        }
515
516        // Top-level routines.
517        Object procedures = body.get("procedures");
518        if (procedures != null) {
519            for (Map<Object, Object> rmap : asMapList(procedures, "procedures")) {
520                String name = wrapDottedName(asString(rmap, "name", true), vendor);
521                sb.addRoutine(RoutineModel.builder()
522                    .name(name)
523                    .kind(CatalogObjectKind.PROCEDURE)
524                    .build());
525            }
526        }
527        Object functions = body.get("functions");
528        if (functions != null) {
529            for (Map<Object, Object> rmap : asMapList(functions, "functions")) {
530                String name = wrapDottedName(asString(rmap, "name", true), vendor);
531                sb.addRoutine(RoutineModel.builder()
532                    .name(name)
533                    .kind(CatalogObjectKind.FUNCTION)
534                    .build());
535            }
536        }
537        Object packages = body.get("packages");
538        if (packages != null) {
539            for (Map<Object, Object> pmap : asMapList(packages, "packages")) {
540                // Package-nested procedures / functions / triggers are intentionally
541                // dropped: RoutineModel does not yet model package-nested children, and
542                // the SQLEnvCatalogLoader has no path to bind them. The empty package
543                // shell still resolves SCHEMA.PKG references.
544                String name = wrapDottedName(asString(pmap, "name", true), vendor);
545                sb.addRoutine(RoutineModel.builder()
546                    .name(name)
547                    .kind(CatalogObjectKind.PACKAGE)
548                    .build());
549            }
550        }
551
552        // Top-level triggers — see class Javadoc; silently dropped because no Phase 1
553        // RoutineModel.kind exists for triggers and the loader does not synthesize them.
554        // Walk the array shape just to fail cleanly on a malformed export.
555        Object triggers = body.get("triggers");
556        if (triggers != null && !(triggers instanceof List)) {
557            throw new CatalogInputException(
558                "SqlFlow field 'triggers' must be an array (got " + typeOf(triggers) + ")");
559        }
560
561        return sb.build();
562    }
563
564    private void addTableOrView(SchemaModel.Builder sb, Map<Object, Object> entry,
565                                EDbVendor vendor)
566            throws CatalogInputException {
567        String objName = wrapDottedName(asString(entry, "name", true), vendor);
568        String type = asString(entry, "type", false);
569        boolean isView = typeIndicatesView(type);
570        // Column names are NOT wrapped: ModelBackedCatalogProvider builds the qualified
571        // column form as `<tableQ>.<column>` where <tableQ> already has any required
572        // quoting. A column name with an embedded dot would still be ambiguous downstream
573        // — but the legacy loader has the same gap, so parity is preserved without the
574        // extra wrapping here. (Same posture as SqldepCatalogInputReader.)
575        List<ColumnModel> cols = parseColumns(entry, objName);
576        if (isView) {
577            ViewModel.Builder vb = ViewModel.builder().name(objName);
578            for (ColumnModel c : cols) {
579                vb.addColumn(c);
580            }
581            sb.addView(vb.build());
582        } else {
583            TableModel.Builder tb = TableModel.builder().name(objName);
584            for (ColumnModel c : cols) {
585                tb.addColumn(c);
586            }
587            sb.addTable(tb.build());
588        }
589    }
590
591    /**
592     * Case-insensitive ASCII substring check for {@code "view"} in the supplied {@code type}
593     * field — matches the legacy {@code SqlflowSQLEnv.appendTables()}'s
594     * {@code type.toLowerCase().indexOf("view") != -1} test without using
595     * {@link String#toLowerCase()}, which the {@code forbidden-apis} plugin bans inside
596     * {@code catalog/**}. {@link Character#toLowerCase(char)} is locale-insensitive for
597     * ASCII characters and is permitted.
598     */
599    private static boolean typeIndicatesView(String type) {
600        if (type == null) {
601            return false;
602        }
603        final int n = type.length();
604        for (int i = 0; i + 4 <= n; i++) {
605            if (Character.toLowerCase(type.charAt(i)) == 'v'
606                && Character.toLowerCase(type.charAt(i + 1)) == 'i'
607                && Character.toLowerCase(type.charAt(i + 2)) == 'e'
608                && Character.toLowerCase(type.charAt(i + 3)) == 'w') {
609                return true;
610            }
611        }
612        return false;
613    }
614
615    private static List<ColumnModel> parseColumns(Map<Object, Object> tmap, String tableName)
616            throws CatalogInputException {
617        Object columns = tmap.get("columns");
618        java.util.ArrayList<ColumnModel> out = new java.util.ArrayList<ColumnModel>();
619        if (columns == null) {
620            return out;
621        }
622        if (!(columns instanceof List)) {
623            throw new CatalogInputException(
624                "SqlFlow object '" + tableName + "' field 'columns' must be an array (got "
625                    + typeOf(columns) + ")");
626        }
627        for (Object cobj : (List<?>) columns) {
628            if (!(cobj instanceof Map)) {
629                throw new CatalogInputException(
630                    "SqlFlow object '" + tableName + "' columns[i] must be an object (got "
631                        + typeOf(cobj) + ")");
632            }
633            @SuppressWarnings("unchecked")
634            Map<Object, Object> cmap = (Map<Object, Object>) cobj;
635            String colName = asString(cmap, "name", true);
636            ColumnModel.Builder cb = ColumnModel.builder().name(colName);
637            String dt = asString(cmap, "dataType", false);
638            if (dt == null) dt = asString(cmap, "type", false);
639            if (dt != null) cb.dataType(dt);
640            out.add(cb.build());
641        }
642        return out;
643    }
644
645    /**
646     * Cast and walk a JSON value as a list of object maps. Throws {@link CatalogInputException}
647     * with a labelled error when the structure is wrong. Used for {@code procedures} /
648     * {@code functions} / {@code packages}.
649     */
650    @SuppressWarnings("unchecked")
651    private static List<Map<Object, Object>> asMapList(Object value, String fieldLabel)
652            throws CatalogInputException {
653        if (!(value instanceof List)) {
654            throw new CatalogInputException(
655                "SqlFlow field '" + fieldLabel + "' must be an array (got " + typeOf(value) + ")");
656        }
657        List<?> raw = (List<?>) value;
658        java.util.ArrayList<Map<Object, Object>> out = new java.util.ArrayList<Map<Object, Object>>(raw.size());
659        for (Object o : raw) {
660            if (!(o instanceof Map)) {
661                throw new CatalogInputException(
662                    "SqlFlow field '" + fieldLabel + "[i]' must be an object (got "
663                        + typeOf(o) + ")");
664            }
665            out.add((Map<Object, Object>) o);
666        }
667        return out;
668    }
669
670    // ---------- dotted-identifier wrapping ----------
671
672    /**
673     * Wrap a raw identifier with the vendor's open/close quote chars when it contains a
674     * dot. Same defense as {@link SqldepCatalogInputReader#wrapDottedName} — see that
675     * method for the rationale. Vendor → delimiter pair table is duplicated here to keep
676     * {@code catalog/**} free of a compile-time dependency on {@code sqlenv/**}; if a
677     * dialect adds a new {@code TSQLEnv.delimitedChar} entry, mirror it in both places.
678     */
679    private static String wrapDottedName(String raw, EDbVendor vendor) {
680        if (raw == null || raw.isEmpty() || raw.indexOf('.') == -1) {
681            return raw;
682        }
683        char open = vendorOpenDelimiter(vendor);
684        char close = vendorCloseDelimiter(vendor);
685        if (raw.length() >= 2
686            && raw.charAt(0) == open
687            && raw.charAt(raw.length() - 1) == close) {
688            return raw;
689        }
690        return open + raw + close;
691    }
692
693    private static char vendorOpenDelimiter(EDbVendor v) {
694        if (v == null) return '"';
695        switch (v) {
696            case dbvmssql:
697            case dbvazuresql:
698                return '[';
699            case dbvathena:
700            case dbvmysql:
701            case dbvbigquery:
702            case dbvcouchbase:
703            case dbvhive:
704            case dbvimpala:
705            case dbvdatabricks:
706                return '`';
707            case dbvdax:
708                return '\'';
709            default:
710                return '"';
711        }
712    }
713
714    private static char vendorCloseDelimiter(EDbVendor v) {
715        if (v == null) return '"';
716        switch (v) {
717            case dbvmssql:
718            case dbvazuresql:
719                return ']';
720            default:
721                return vendorOpenDelimiter(v);
722        }
723    }
724
725    // ---------- vendor schema-support ----------
726
727    /**
728     * Mirror of {@link gudusoft.gsqlparser.sqlenv.TSQLEnv#supportSchema(EDbVendor)}. Inlined
729     * here so {@code catalog/**} does not import from {@code sqlenv/**} on the read path
730     * (the loader path already uses {@code TSQLEnv.supportSchema} for the symmetric
731     * decision). Kept in sync — if a new schema-less dialect lands in the legacy method
732     * (unlikely; the list has been stable for years), reflect it here too.
733     */
734    private static boolean supportSchema(EDbVendor vendor) {
735        return !(vendor == EDbVendor.dbvmysql
736            || vendor == EDbVendor.dbvteradata
737            || vendor == EDbVendor.dbvhive
738            || vendor == EDbVendor.dbvimpala);
739    }
740
741    // ---------- defaults / source info ----------
742
743    /**
744     * Seed the supplied {@link DefaultsConfig.Builder} from {@code options} non-empty
745     * fields. Returns {@code true} if at least one default was contributed, so the caller
746     * knows whether to call {@link UnifiedCatalogModel.Builder#defaults} at the end. The
747     * server-name capture from JSON layers on top of this seed in the read path.
748     */
749    private static boolean seedOptionDefaults(DefaultsConfig.Builder db,
750                                              CatalogLoadOptions options) {
751        if (options == null) return false;
752        boolean any = false;
753        if (options.defaultCatalog() != null && !options.defaultCatalog().isEmpty()) {
754            db.defaultCatalog(options.defaultCatalog());
755            any = true;
756        }
757        if (options.defaultSchema() != null && !options.defaultSchema().isEmpty()) {
758            db.defaultSchema(options.defaultSchema());
759            any = true;
760        }
761        if (options.defaultServer() != null && !options.defaultServer().isEmpty()) {
762            db.defaultServer(options.defaultServer());
763            any = true;
764        }
765        return any;
766    }
767
768    private static CatalogSourceInfo buildSourceInfo(CatalogInputSource source, long startMillis) {
769        return CatalogSourceInfo.builder()
770            .kind(CatalogInputKind.SQLFLOW_JSON)
771            .name(source.name() != null ? source.name() : "<sqlflow>")
772            .readMillis(System.currentTimeMillis() - startMillis)
773            .build();
774    }
775
776    // ---------- input → string ----------
777
778    private static String readAll(CatalogInputSource source) throws CatalogInputException {
779        try {
780            if (source.inMemoryModel() != null) {
781                throw new CatalogInputException(
782                    "SqlflowCatalogInputReader cannot read in-memory model sources");
783            }
784            if (source.path() != null) {
785                byte[] b = Files.readAllBytes(source.path());
786                return new String(b, StandardCharsets.UTF_8);
787            }
788            byte[] sourceBytes = source.bytes();
789            if (sourceBytes != null) {
790                return new String(sourceBytes, StandardCharsets.UTF_8);
791            }
792            if (source.url() != null) {
793                try (InputStream in = source.url().openStream();
794                     Reader r = new InputStreamReader(in, StandardCharsets.UTF_8)) {
795                    return drain(r);
796                }
797            }
798            if (source.reader() != null) {
799                return drain(source.reader());
800            }
801            throw new CatalogInputException(
802                "SqlflowCatalogInputReader: source has no readable backing");
803        } catch (IOException io) {
804            throw new CatalogInputException(
805                "Failed to read SqlFlow export from " + source.name() + ": " + io.getMessage(),
806                io);
807        }
808    }
809
810    private static String drain(Reader r) throws IOException {
811        BufferedReader br = (r instanceof BufferedReader) ? (BufferedReader) r : new BufferedReader(r);
812        StringBuilder sb = new StringBuilder();
813        char[] buf = new char[4096];
814        int n;
815        while ((n = br.read(buf)) > 0) {
816            sb.append(buf, 0, n);
817        }
818        return sb.toString();
819    }
820
821    // ---------- shared field accessors ----------
822
823    private static String asString(Map<Object, Object> obj, String key, boolean required)
824            throws CatalogInputException {
825        Object v = obj.get(key);
826        if (v == null) {
827            if (required) {
828                throw new CatalogInputException(
829                    "SqlFlow entry missing required field '" + key + "'");
830            }
831            return null;
832        }
833        return v instanceof String ? (String) v : v.toString();
834    }
835
836    private static String typeOf(Object o) {
837        return o == null ? "null" : o.getClass().getSimpleName();
838    }
839
840    /** ServiceLoader-discoverable factory. Plan §13.1. */
841    public static final class Factory implements CatalogInputReaderFactory {
842
843        public Factory() {
844            // Required no-arg constructor for ServiceLoader.
845        }
846
847        @Override
848        public CatalogInputKind kind() {
849            return CatalogInputKind.SQLFLOW_JSON;
850        }
851
852        @Override
853        public CatalogInputReader create() {
854            return new SqlflowCatalogInputReader();
855        }
856    }
857}