001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.*; 004import gudusoft.gsqlparser.dlineage.dataflow.model.*; 005import gudusoft.gsqlparser.dlineage.util.DlineageUtil; 006import gudusoft.gsqlparser.nodes.*; 007import gudusoft.gsqlparser.stmt.*; 008import gudusoft.gsqlparser.stmt.oracle.*; 009import gudusoft.gsqlparser.util.Logger; 010import gudusoft.gsqlparser.util.LoggerFactory; 011import gudusoft.gsqlparser.util.SQLUtil; 012 013import java.util.*; 014 015/** 016 * Handles Oracle pipelined table function cross-boundary data lineage stitching. 017 * 018 * <p>Two-phase approach: 019 * <ol> 020 * <li>Index phase: indexes object types, collection types, and pipelined function signatures</li> 021 * <li>Stitch phase: at call sites, appends lineage from function body to caller columns</li> 022 * </ol> 023 */ 024public class PipelinedFunctionAnalyzer { 025 026 private static final Logger logger = LoggerFactory.getLogger(PipelinedFunctionAnalyzer.class); 027 028 private final ModelBindingManager modelManager; 029 private final ModelFactory modelFactory; 030 private final Option option; 031 032 public PipelinedFunctionAnalyzer(ModelBindingManager modelManager, ModelFactory modelFactory, Option option) { 033 this.modelManager = modelManager; 034 this.modelFactory = modelFactory; 035 this.option = option; 036 } 037 038 // ---- Phase 1: Index CREATE TYPE AS OBJECT ---- 039 040 public void indexObjectType(TPlsqlCreateType stmt) { 041 if (!option.isEnablePipelinedStitching()) return; 042 if (stmt.getTypeAttributes() == null || stmt.getTypeAttributes().size() == 0) return; 043 if (stmt.getTypeName() == null) return; 044 045 String typeName = normalizeTypeName(stmt.getTypeName().toString()); 046 List<String> fieldNames = new ArrayList<>(); 047 for (int i = 0; i < stmt.getTypeAttributes().size(); i++) { 048 TTypeAttribute attr = stmt.getTypeAttributes().getAttributeItem(i); 049 if (attr.getAttributeName() != null) { 050 fieldNames.add(attr.getAttributeName().toString()); 051 } 052 } 053 modelManager.indexObjectType(typeName, fieldNames); 054 // Also index by bare name (no schema prefix) 055 String bareName = getBareName(typeName); 056 if (!bareName.equals(typeName)) { 057 modelManager.indexObjectType(bareName, fieldNames); 058 } 059 logger.trace("Indexed object type: " + typeName + " (bare=" + bareName + "), fields=" + fieldNames); 060 } 061 062 // ---- Phase 1: Index TABLE OF collection types ---- 063 064 public void indexCollectionType(TPlsqlTableTypeDefStmt stmt) { 065 if (!option.isEnablePipelinedStitching()) return; 066 if (stmt.getTypeName() == null || stmt.getElementDataType() == null) return; 067 068 String collectionName = normalizeTypeName(stmt.getTypeName().toString()); 069 String elementTypeName = normalizeTypeName(stmt.getElementDataType().toString()); 070 modelManager.indexCollectionType(collectionName, elementTypeName); 071 // Also index by bare name 072 String bareCollection = getBareName(collectionName); 073 if (!bareCollection.equals(collectionName)) { 074 modelManager.indexCollectionType(bareCollection, elementTypeName); 075 } 076 logger.trace("Indexed collection type: " + collectionName + " -> " + elementTypeName); 077 } 078 079 // ---- Phase 2: Detect pipelined function and build signature ---- 080 081 public void analyzePipelinedFunction(TPlsqlCreateFunction func) { 082 if (!option.isEnablePipelinedStitching()) return; 083 084 // Scan body for PIPE ROW statements using iterative traversal 085 boolean hasPipeRow = false; 086 List<TPlsqlPipeRowStmt> pipeRowStmts = new ArrayList<>(); 087 List<TAssignStmt> assignStmts = new ArrayList<>(); 088 List<TLoopStmt> cursorLoops = new ArrayList<>(); 089 090 Deque<TCustomSqlStatement> stack = new ArrayDeque<>(); 091 // Push body statements 092 for (int i = func.getBodyStatements().size() - 1; i >= 0; i--) { 093 stack.push(func.getBodyStatements().get(i)); 094 } 095 // Also check inner statements 096 for (int i = func.getStatements().size() - 1; i >= 0; i--) { 097 stack.push(func.getStatements().get(i)); 098 } 099 100 while (!stack.isEmpty()) { 101 TCustomSqlStatement s = stack.pop(); 102 if (s instanceof TPlsqlPipeRowStmt) { 103 pipeRowStmts.add((TPlsqlPipeRowStmt) s); 104 hasPipeRow = true; 105 } else if (s instanceof TAssignStmt) { 106 assignStmts.add((TAssignStmt) s); 107 } else if (s instanceof TLoopStmt) { 108 TLoopStmt loop = (TLoopStmt) s; 109 if (loop.getSubquery() != null || loop.getCursorName() != null) { 110 cursorLoops.add(loop); 111 } 112 // Push loop body statements 113 for (int i = loop.getBodyStatements().size() - 1; i >= 0; i--) { 114 stack.push(loop.getBodyStatements().get(i)); 115 } 116 } else if (s instanceof TIfStmt) { 117 TIfStmt ifStmt = (TIfStmt) s; 118 if (ifStmt.getThenStatements() != null) { 119 for (int i = ifStmt.getThenStatements().size() - 1; i >= 0; i--) { 120 stack.push(ifStmt.getThenStatements().get(i)); 121 } 122 } 123 if (ifStmt.getElseStatements() != null) { 124 for (int i = ifStmt.getElseStatements().size() - 1; i >= 0; i--) { 125 stack.push(ifStmt.getElseStatements().get(i)); 126 } 127 } 128 if (ifStmt.getElseifStatements() != null) { 129 for (int i = ifStmt.getElseifStatements().size() - 1; i >= 0; i--) { 130 stack.push(ifStmt.getElseifStatements().get(i)); 131 } 132 } 133 } else if (s instanceof TCommonBlock) { 134 TCommonBlock block = (TCommonBlock) s; 135 if (block.getBodyStatements() != null) { 136 for (int i = block.getBodyStatements().size() - 1; i >= 0; i--) { 137 stack.push(block.getBodyStatements().get(i)); 138 } 139 } 140 if (block.getStatements() != null) { 141 for (int i = block.getStatements().size() - 1; i >= 0; i--) { 142 stack.push(block.getStatements().get(i)); 143 } 144 } 145 } 146 // Fallback: try to push sub-statements for any other compound statement type 147 else { 148 if (s.getStatements() != null && s.getStatements().size() > 0) { 149 for (int i = s.getStatements().size() - 1; i >= 0; i--) { 150 stack.push(s.getStatements().get(i)); 151 } 152 } 153 if (s instanceof TBlockSqlStatement) { 154 TBlockSqlStatement blockStmt = (TBlockSqlStatement) s; 155 if (blockStmt.getBodyStatements() != null && blockStmt.getBodyStatements().size() > 0) { 156 for (int i = blockStmt.getBodyStatements().size() - 1; i >= 0; i--) { 157 stack.push(blockStmt.getBodyStatements().get(i)); 158 } 159 } 160 } 161 } 162 } 163 164 if (!hasPipeRow) return; 165 166 // Build signature 167 PipelinedFunctionSignature sig = new PipelinedFunctionSignature(); 168 String funcName = func.getFunctionName() != null ? func.getFunctionName().toString() : ""; 169 170 // Build function key with package context 171 String packagePrefix = ""; 172 if (ModelBindingManager.getGlobalOraclePackage() != null) { 173 packagePrefix = ModelBindingManager.getGlobalOraclePackage().getName(); 174 if (packagePrefix != null && !packagePrefix.isEmpty()) { 175 packagePrefix = packagePrefix + "."; 176 } else { 177 packagePrefix = ""; 178 } 179 } 180 181 int argCount = func.getParameterDeclarations() != null ? func.getParameterDeclarations().size() : 0; 182 183 String fullKey = normalizeKey(packagePrefix + funcName); 184 String nakedKey = normalizeKey(getBareName(funcName)); 185 186 sig.setFunctionKey(fullKey); 187 sig.setNakedKey(nakedKey); 188 sig.setArgCount(argCount); 189 190 // Resolve return type → collection type → element/object type → output columns 191 TTypeName returnType = func.getReturnDataType(); 192 logger.trace("Pipelined func " + funcName + ": returnType=" + (returnType != null ? returnType.toString() : "null")); 193 if (returnType != null) { 194 String returnTypeName = normalizeTypeName(returnType.toString()); 195 logger.trace(" returnTypeName normalized=" + returnTypeName); 196 // Try as collection type first 197 String elementType = modelManager.getElementTypeName(returnTypeName); 198 if (elementType == null) { 199 elementType = modelManager.getElementTypeName(getBareName(returnTypeName)); 200 } 201 if (elementType != null) { 202 sig.setCollectionTypeKey(returnTypeName); 203 sig.setRowTypeKey(elementType); 204 } else { 205 // Maybe the return type IS the object type directly 206 sig.setRowTypeKey(returnTypeName); 207 } 208 209 // Resolve output columns from object type index 210 String rowTypeKey = sig.getRowTypeKey(); 211 List<String> fields = null; 212 if (rowTypeKey != null) { 213 fields = modelManager.getObjectTypeFields(rowTypeKey); 214 if (fields == null) { 215 fields = modelManager.getObjectTypeFields(getBareName(rowTypeKey)); 216 } 217 } 218 if (fields != null) { 219 sig.setOutputColumns(new ArrayList<>(fields)); 220 } 221 } 222 223 // Phase 3: Extract PIPE ROW lineage 224 if (sig.getOutputColumns() != null && !sig.getOutputColumns().isEmpty()) { 225 resolvePipeRowLineage(func, sig, pipeRowStmts, assignStmts, cursorLoops); 226 } else { 227 sig.setStatus(PipelinedFunctionSignature.ResolutionStatus.UNRESOLVED); 228 } 229 230 modelManager.addPipelinedSignature(sig); 231 logger.trace("Pipelined signature built: key=" + sig.getFunctionKey() 232 + ", nakedKey=" + sig.getNakedKey() 233 + ", resolved=" + sig.isResolved() 234 + ", status=" + sig.getStatus() 235 + ", outputCols=" + sig.getOutputColumns() 236 + ", lineageKeys=" + sig.getLineageByOutputColumn().keySet() 237 + ", lineageEntries=" + sig.getLineageByOutputColumn().size()); 238 for (Map.Entry<String, List<PipelinedSourceRef>> entry : sig.getLineageByOutputColumn().entrySet()) { 239 logger.trace(" " + entry.getKey() + " <- " + entry.getValue()); 240 } 241 } 242 243 // ---- Phase 3: PIPE ROW lineage extraction ---- 244 245 private void resolvePipeRowLineage(TPlsqlCreateFunction func, PipelinedFunctionSignature sig, 246 List<TPlsqlPipeRowStmt> pipeRowStmts, 247 List<TAssignStmt> assignStmts, 248 List<TLoopStmt> cursorLoops) { 249 // Build cursor bindings from explicit CURSOR declarations 250 // Check both declare section (getStatements) and body (getBodyStatements) 251 Map<String, TSelectSqlStatement> cursorBindings = new LinkedHashMap<>(); 252 for (int i = 0; i < func.getStatements().size(); i++) { 253 TCustomSqlStatement s = func.getStatements().get(i); 254 if (s instanceof TCursorDeclStmt) { 255 TCursorDeclStmt cursorDecl = (TCursorDeclStmt) s; 256 if (cursorDecl.getCursorName() != null && cursorDecl.getSubquery() != null) { 257 cursorBindings.put(cursorDecl.getCursorName().toString().toLowerCase(), cursorDecl.getSubquery()); 258 } 259 } 260 } 261 for (int i = 0; i < func.getBodyStatements().size(); i++) { 262 TCustomSqlStatement s = func.getBodyStatements().get(i); 263 if (s instanceof TCursorDeclStmt) { 264 TCursorDeclStmt cursorDecl = (TCursorDeclStmt) s; 265 if (cursorDecl.getCursorName() != null && cursorDecl.getSubquery() != null) { 266 cursorBindings.put(cursorDecl.getCursorName().toString().toLowerCase(), cursorDecl.getSubquery()); 267 } 268 } 269 } 270 271 // Build loop variable bindings: loopVar -> cursor SELECT 272 Map<String, TSelectSqlStatement> loopVarBindings = new LinkedHashMap<>(); 273 for (TLoopStmt loop : cursorLoops) { 274 String loopVar = null; 275 if (loop.getRecordName() != null) { 276 loopVar = loop.getRecordName().toString().toLowerCase(); 277 } else if (loop.getIndexName() != null) { 278 loopVar = loop.getIndexName().toString().toLowerCase(); 279 } 280 if (loopVar == null) continue; 281 282 if (loop.getSubquery() != null) { 283 // Inline SELECT: FOR item IN (SELECT ...) LOOP 284 loopVarBindings.put(loopVar, loop.getSubquery()); 285 } else if (loop.getCursorName() != null) { 286 // Named cursor: FOR item IN cursorName LOOP 287 String cursorName = loop.getCursorName().toString().toLowerCase(); 288 TSelectSqlStatement cursorSelect = cursorBindings.get(cursorName); 289 if (cursorSelect != null) { 290 loopVarBindings.put(loopVar, cursorSelect); 291 } 292 logger.trace("Named cursor loop: var=" + loopVar + ", cursor=" + cursorName 293 + ", resolved=" + (cursorSelect != null)); 294 } 295 } 296 297 // Build assignment maps 298 // outRow := ROW_TYPE(...) --> variable -> list of constructor expressions (union from multiple branches) 299 Map<String, List<TExpression>> varConstructors = new LinkedHashMap<>(); 300 // outRow.field := expr --> variable -> field -> expression 301 Map<String, Map<String, TExpression>> varFieldAssignments = new LinkedHashMap<>(); 302 303 for (TAssignStmt assign : assignStmts) { 304 TExpression leftExpr = assign.getLeft(); 305 TExpression rightExpr = assign.getExpression(); 306 if (leftExpr == null || rightExpr == null) continue; 307 308 String leftText = leftExpr.toString().trim(); 309 int dotIdx = leftText.indexOf('.'); 310 if (dotIdx > 0) { 311 // outRow.field := expr 312 String varName = leftText.substring(0, dotIdx).toLowerCase(); 313 String fieldName = leftText.substring(dotIdx + 1); 314 varFieldAssignments.computeIfAbsent(varName, k -> new LinkedHashMap<>()) 315 .put(fieldName, rightExpr); 316 } else { 317 // outRow := ROW_TYPE(...) 318 String varName = leftText.toLowerCase(); 319 varConstructors.computeIfAbsent(varName, k -> new ArrayList<>()) 320 .add(rightExpr); 321 } 322 } 323 324 // Check for cursor FOR loops using named cursors (FOR item IN cursorName LOOP) 325 for (TLoopStmt loop : cursorLoops) { 326 if (loop.getIndexName() != null && loop.getSubquery() == null) { 327 // Might reference a named cursor 328 String loopVar = loop.getIndexName().toString().toLowerCase(); 329 // Look for cursor name in the loop definition 330 // The cursor name would be in the loop's cursor expression 331 // This is handled differently - the subquery is already resolved 332 } 333 } 334 335 // Process each PIPE ROW statement 336 for (TPlsqlPipeRowStmt pipeRow : pipeRowStmts) { 337 TExpression expr = pipeRow.getExpression(); 338 if (expr == null) continue; 339 340 String exprText = expr.toString().trim(); 341 342 if (isObjectConstructor(expr)) { 343 // PIPE ROW(ROW_TYPE(arg1, arg2, ...)) 344 bindConstructorArgs(sig, expr, loopVarBindings, cursorBindings); 345 } else { 346 // PIPE ROW(outRow) - variable reference 347 String varName = exprText.toLowerCase(); 348 349 // Check for constructor assignments: outRow := ROW_TYPE(...) 350 List<TExpression> constructors = varConstructors.get(varName); 351 if (constructors != null) { 352 for (TExpression constructor : constructors) { 353 if (isObjectConstructor(constructor)) { 354 bindConstructorArgs(sig, constructor, loopVarBindings, cursorBindings); 355 } 356 } 357 } 358 359 // Check for field assignments: outRow.field := expr 360 Map<String, TExpression> fieldAssigns = varFieldAssignments.get(varName); 361 if (fieldAssigns != null) { 362 overlayFieldAssignments(sig, fieldAssigns, loopVarBindings, cursorBindings); 363 } 364 } 365 } 366 367 sig.setResolved(true); 368 sig.setStatus(PipelinedFunctionSignature.ResolutionStatus.OK); 369 } 370 371 private boolean isObjectConstructor(TExpression expr) { 372 if (expr == null) return false; 373 // A constructor call looks like a function call: ROW_TYPE(arg1, arg2, ...) 374 if (expr.getExpressionType() == EExpressionType.function_t && expr.getFunctionCall() != null) { 375 return true; 376 } 377 // Also handle simple_object_expr_t 378 if (expr.getExpressionType() == EExpressionType.simple_object_name_t) { 379 return false; 380 } 381 // Check if it's a function call expression 382 String text = expr.toString().trim(); 383 if (text.contains("(") && text.endsWith(")")) { 384 return expr.getFunctionCall() != null; 385 } 386 return false; 387 } 388 389 private void bindConstructorArgs(PipelinedFunctionSignature sig, TExpression constructorExpr, 390 Map<String, TSelectSqlStatement> loopVarBindings, 391 Map<String, TSelectSqlStatement> cursorBindings) { 392 TFunctionCall funcCall = constructorExpr.getFunctionCall(); 393 if (funcCall == null || funcCall.getArgs() == null) return; 394 395 TExpressionList args = funcCall.getArgs(); 396 List<String> outputColumns = sig.getOutputColumns(); 397 398 for (int i = 0; i < args.size() && i < outputColumns.size(); i++) { 399 TExpression arg = args.getExpression(i); 400 String outputCol = outputColumns.get(i).toLowerCase(); 401 resolveArgLineage(sig, outputCol, arg, loopVarBindings, cursorBindings); 402 } 403 } 404 405 private void resolveArgLineage(PipelinedFunctionSignature sig, String outputCol, 406 TExpression arg, 407 Map<String, TSelectSqlStatement> loopVarBindings, 408 Map<String, TSelectSqlStatement> cursorBindings) { 409 if (arg == null) return; 410 411 String argText = arg.toString().trim(); 412 413 // Check if it's a wrapper function like util.TO_NUMBER(item.xxx) - extract inner argument 414 TExpression innerArg = unwrapFunctionCalls(arg); 415 if (innerArg != arg) { 416 argText = innerArg.toString().trim(); 417 } 418 419 // Check for loop variable reference: item.COLUMN_NAME 420 int dotIdx = argText.indexOf('.'); 421 if (dotIdx > 0) { 422 String prefix = argText.substring(0, dotIdx).toLowerCase(); 423 String fieldName = argText.substring(dotIdx + 1); 424 425 // Check loop variable bindings 426 TSelectSqlStatement cursorSelect = loopVarBindings.get(prefix); 427 if (cursorSelect == null) { 428 cursorSelect = cursorBindings.get(prefix); 429 } 430 if (cursorSelect != null) { 431 // Resolve the field from the cursor's select list 432 resolveFromCursorSelect(sig, outputCol, fieldName, cursorSelect); 433 return; 434 } 435 } 436 437 // Check for simple column reference (no prefix) 438 if (dotIdx < 0 && !argText.contains("(")) { 439 // Could be a local variable or parameter - mark as unresolved for now 440 sig.addSourceRef(outputCol, new PipelinedSourceRef( 441 "unknown", argText, PipelinedSourceRef.SourceKind.UNRESOLVED)); 442 return; 443 } 444 445 // Check for literal/constant 446 if (argText.equals("null") || argText.startsWith("'") || argText.matches("-?\\d+.*")) { 447 // Constant - empty source, just preserve column structure 448 return; 449 } 450 451 // For qualified names like table.column that aren't loop variables 452 if (dotIdx > 0) { 453 String tablePart = argText.substring(0, dotIdx); 454 String colPart = argText.substring(dotIdx + 1); 455 sig.addSourceRef(outputCol, new PipelinedSourceRef( 456 tablePart, colPart, PipelinedSourceRef.SourceKind.BASE_TABLE)); 457 } 458 } 459 460 private TExpression unwrapFunctionCalls(TExpression expr) { 461 // Unwrap wrapper functions like TO_NUMBER(x), TO_CHAR(x), TRIM(x), util.TO_NUMBER(x) 462 if (expr.getExpressionType() == EExpressionType.function_t && expr.getFunctionCall() != null) { 463 TFunctionCall func = expr.getFunctionCall(); 464 if (func.getArgs() != null && func.getArgs().size() == 1) { 465 String funcName = func.getFunctionName().toString().toUpperCase(); 466 // Common wrapper functions 467 if (funcName.endsWith("TO_NUMBER") || funcName.endsWith("TO_CHAR") 468 || funcName.endsWith("TO_DATE") || funcName.equals("TRIM") 469 || funcName.equals("NVL") || funcName.equals("COALESCE") 470 || funcName.equals("UPPER") || funcName.equals("LOWER") 471 || funcName.equals("CAST") || funcName.endsWith(".TO_NUMBER")) { 472 return unwrapFunctionCalls(func.getArgs().getExpression(0)); 473 } 474 } 475 } 476 return expr; 477 } 478 479 private void resolveFromCursorSelect(PipelinedFunctionSignature sig, String outputCol, 480 String fieldName, TSelectSqlStatement cursorSelect) { 481 // Find the column in the cursor's SELECT list that matches fieldName 482 if (cursorSelect.getResultColumnList() == null) return; 483 484 String fieldNameLower = fieldName.toLowerCase(); 485 // Strip quotes 486 fieldNameLower = SQLUtil.trimColumnStringQuote(fieldNameLower); 487 488 for (int i = 0; i < cursorSelect.getResultColumnList().size(); i++) { 489 TResultColumn rc = cursorSelect.getResultColumnList().getResultColumn(i); 490 String alias = null; 491 if (rc.getAliasClause() != null) { 492 alias = rc.getAliasClause().toString().toLowerCase(); 493 alias = SQLUtil.trimColumnStringQuote(alias); 494 } 495 String colName = null; 496 if (rc.getExpr() != null) { 497 colName = rc.getExpr().toString().toLowerCase(); 498 // Extract just the column part from table.column 499 int dot = colName.lastIndexOf('.'); 500 if (dot >= 0) { 501 colName = colName.substring(dot + 1); 502 } 503 colName = SQLUtil.trimColumnStringQuote(colName); 504 } 505 506 if (fieldNameLower.equals(alias) || fieldNameLower.equals(colName)) { 507 // Found the matching column - trace its source 508 resolveResultColumnSources(sig, outputCol, rc, cursorSelect); 509 return; 510 } 511 } 512 513 // Didn't find by name, try position if same number of columns 514 sig.addSourceRef(outputCol, new PipelinedSourceRef( 515 "cursor", fieldName, PipelinedSourceRef.SourceKind.UNRESOLVED)); 516 } 517 518 private void resolveResultColumnSources(PipelinedFunctionSignature sig, String outputCol, 519 TResultColumn rc, TSelectSqlStatement select) { 520 resolveResultColumnSourcesWithDepth(sig, outputCol, rc, select, 0); 521 } 522 523 private static final int MAX_CTE_TRACE_DEPTH = 10; 524 525 private void resolveResultColumnSourcesWithDepth(PipelinedFunctionSignature sig, String outputCol, 526 TResultColumn rc, TSelectSqlStatement select, int depth) { 527 if (rc.getExpr() == null) return; 528 if (depth > MAX_CTE_TRACE_DEPTH) return; 529 530 TExpression expr = rc.getExpr(); 531 // Unwrap wrapper functions 532 expr = unwrapFunctionCalls(expr); 533 String exprText = expr.toString().trim(); 534 535 // Try to resolve to a base table column 536 int dotIdx = exprText.lastIndexOf('.'); 537 if (dotIdx > 0) { 538 String tableRef = exprText.substring(0, dotIdx); 539 String colRef = exprText.substring(dotIdx + 1); 540 541 // Check if the table reference points to a CTE - if so, trace through it 542 TTable sourceTable = findTableByRefInSelect(tableRef, select); 543 if (sourceTable != null && sourceTable.isCTEName()) { 544 // Use the actual table name (not the alias) to find the CTE definition 545 String cteName = getFullTableName(sourceTable); 546 if (traceThroughCTE(sig, outputCol, colRef, cteName, select, depth)) { 547 return; 548 } 549 } 550 551 // Resolve table reference against FROM clause 552 String resolvedTable = resolveTableRef(tableRef, select); 553 if (resolvedTable != null) { 554 sig.addSourceRef(outputCol, new PipelinedSourceRef( 555 resolvedTable, SQLUtil.trimColumnStringQuote(colRef), 556 PipelinedSourceRef.SourceKind.BASE_TABLE)); 557 return; 558 } 559 // Fallback: use the table ref as-is 560 sig.addSourceRef(outputCol, new PipelinedSourceRef( 561 tableRef, SQLUtil.trimColumnStringQuote(colRef), 562 PipelinedSourceRef.SourceKind.BASE_TABLE)); 563 } else if (exprText.equals("null") || exprText.startsWith("'") || exprText.matches("-?\\d+.*")) { 564 // Constant - just preserve column structure 565 sig.addSourceRef(outputCol, new PipelinedSourceRef( 566 "constant", exprText, PipelinedSourceRef.SourceKind.CONST)); 567 } else if (!exprText.contains("(") && !exprText.contains(" ")) { 568 // Simple column name without table prefix 569 // Try to resolve by checking sourceTable from the parser's column resolution 570 if (rc.getExpr().getObjectOperand() != null 571 && rc.getExpr().getObjectOperand().getSourceTable() != null) { 572 TTable sourceTable = rc.getExpr().getObjectOperand().getSourceTable(); 573 // If the source table is a CTE, trace through it to find actual base tables 574 if (sourceTable.isCTEName()) { 575 String cteName = getFullTableName(sourceTable); 576 if (traceThroughCTE(sig, outputCol, exprText, cteName, select, depth)) { 577 return; 578 } 579 } 580 String tableName = getFullTableName(sourceTable); 581 sig.addSourceRef(outputCol, new PipelinedSourceRef( 582 tableName, SQLUtil.trimColumnStringQuote(exprText), 583 PipelinedSourceRef.SourceKind.BASE_TABLE)); 584 } else { 585 // Try to find which table it belongs to from the FROM clause 586 String resolvedTable = resolveColumnTable(exprText, select); 587 if (resolvedTable != null) { 588 sig.addSourceRef(outputCol, new PipelinedSourceRef( 589 resolvedTable, SQLUtil.trimColumnStringQuote(exprText), 590 PipelinedSourceRef.SourceKind.BASE_TABLE)); 591 } else { 592 // Fallback: use the SELECT itself as the source - the analyzer will trace further 593 // Use the cursor/select name as a proxy source 594 String selectName = "cursor_result"; 595 if (select.tables != null && select.tables.size() > 0) { 596 selectName = getFullTableName(select.tables.getTable(0)); 597 } 598 sig.addSourceRef(outputCol, new PipelinedSourceRef( 599 selectName, SQLUtil.trimColumnStringQuote(exprText), 600 PipelinedSourceRef.SourceKind.CTE)); 601 } 602 } 603 } else { 604 // Complex expression - try extracting column refs 605 extractColumnRefsFromExpr(sig, outputCol, expr, select); 606 } 607 } 608 609 /** 610 * Traces a column reference through a CTE to find actual base tables. 611 * Returns true if at least one base table source was found. 612 */ 613 private boolean traceThroughCTE(PipelinedFunctionSignature sig, String outputCol, 614 String colName, String cteRef, TSelectSqlStatement select, int depth) { 615 if (depth > MAX_CTE_TRACE_DEPTH) return false; 616 617 // Find the CTE definition matching cteRef 618 TCTE cte = findCTE(cteRef, select); 619 if (cte == null || cte.getSubquery() == null) return false; 620 621 TSelectSqlStatement cteSelect = cte.getSubquery(); 622 // Handle the outermost SELECT of the CTE - for UNION, use leftStmt chain 623 if (cteSelect.getSetOperatorType() != ESetOperatorType.none) { 624 // For UNION queries, trace through the first branch 625 Deque<TSelectSqlStatement> unionStack = new ArrayDeque<>(); 626 unionStack.push(cteSelect); 627 while (!unionStack.isEmpty()) { 628 TSelectSqlStatement current = unionStack.pop(); 629 if (current.getSetOperatorType() != ESetOperatorType.none) { 630 if (current.getRightStmt() != null) unionStack.push(current.getRightStmt()); 631 if (current.getLeftStmt() != null) unionStack.push(current.getLeftStmt()); 632 } else { 633 // Process this leaf SELECT 634 traceCTESelectColumn(sig, outputCol, colName, current, select, depth); 635 return true; // Just use the first branch for lineage 636 } 637 } 638 return false; 639 } 640 641 return traceCTESelectColumn(sig, outputCol, colName, cteSelect, select, depth); 642 } 643 644 /** 645 * Traces a column through a CTE's SELECT statement to find its source. 646 */ 647 private boolean traceCTESelectColumn(PipelinedFunctionSignature sig, String outputCol, 648 String colName, TSelectSqlStatement cteSelect, 649 TSelectSqlStatement outerSelect, int depth) { 650 if (cteSelect.getResultColumnList() == null) return false; 651 652 String colNameLower = SQLUtil.trimColumnStringQuote(colName.toLowerCase()); 653 654 // Find the matching result column in the CTE's SELECT list 655 for (int i = 0; i < cteSelect.getResultColumnList().size(); i++) { 656 TResultColumn rc = cteSelect.getResultColumnList().getResultColumn(i); 657 String alias = null; 658 if (rc.getAliasClause() != null) { 659 alias = rc.getAliasClause().toString().toLowerCase(); 660 alias = SQLUtil.trimColumnStringQuote(alias); 661 } 662 String rcColName = null; 663 if (rc.getExpr() != null) { 664 rcColName = rc.getExpr().toString().toLowerCase(); 665 int dot = rcColName.lastIndexOf('.'); 666 if (dot >= 0) { 667 rcColName = rcColName.substring(dot + 1); 668 } 669 rcColName = SQLUtil.trimColumnStringQuote(rcColName); 670 } 671 672 if (colNameLower.equals(alias) || colNameLower.equals(rcColName)) { 673 // Found the column - recursively resolve its source 674 resolveResultColumnSourcesWithDepth(sig, outputCol, rc, cteSelect, depth + 1); 675 return true; 676 } 677 } 678 return false; 679 } 680 681 /** 682 * Finds a CTE by name in the given SELECT or its parent statements. 683 */ 684 private TCTE findCTE(String cteRef, TSelectSqlStatement select) { 685 String cteRefLower = SQLUtil.trimColumnStringQuote(cteRef.toLowerCase()); 686 // Also try bare name 687 String cteRefBare = getBareName(cteRefLower); 688 689 TCustomSqlStatement current = select; 690 while (current != null) { 691 if (current instanceof TSelectSqlStatement) { 692 TSelectSqlStatement sel = (TSelectSqlStatement) current; 693 if (sel.getCteList() != null) { 694 for (int i = 0; i < sel.getCteList().size(); i++) { 695 TCTE cte = sel.getCteList().getCTE(i); 696 if (cte.getTableName() != null) { 697 String cteName = cte.getTableName().toString().toLowerCase(); 698 if (cteRefLower.equals(cteName) || cteRefBare.equals(cteName)) { 699 return cte; 700 } 701 } 702 } 703 } 704 } 705 current = current.getParentStmt(); 706 } 707 return null; 708 } 709 710 /** 711 * Finds a TTable in the SELECT's FROM clause by reference (name or alias). 712 */ 713 private TTable findTableByRefInSelect(String tableRef, TSelectSqlStatement select) { 714 if (select == null || select.tables == null) return null; 715 String refLower = SQLUtil.trimColumnStringQuote(tableRef.toLowerCase()); 716 717 for (int i = 0; i < select.tables.size(); i++) { 718 TTable t = select.tables.getTable(i); 719 if (t.getAliasClause() != null) { 720 String alias = SQLUtil.trimColumnStringQuote(t.getAliasClause().toString().toLowerCase()); 721 if (alias.equals(refLower)) return t; 722 } 723 String tName = t.getFullName() != null ? t.getFullName().toLowerCase() : ""; 724 String bareTableName = getBareName(tName); 725 if (refLower.equals(bareTableName) || refLower.equals(tName)) return t; 726 } 727 return null; 728 } 729 730 private void extractColumnRefsFromExpr(PipelinedFunctionSignature sig, String outputCol, 731 TExpression expr, TSelectSqlStatement select) { 732 // Use iterative traversal to find object names in the expression 733 Deque<TExpression> exprStack = new ArrayDeque<>(); 734 exprStack.push(expr); 735 boolean found = false; 736 737 while (!exprStack.isEmpty()) { 738 TExpression current = exprStack.pop(); 739 if (current == null) continue; 740 741 if (current.getExpressionType() == EExpressionType.simple_object_name_t 742 && current.getObjectOperand() != null) { 743 String objName = current.getObjectOperand().toString(); 744 int dot = objName.lastIndexOf('.'); 745 if (dot > 0) { 746 String tableRef = objName.substring(0, dot); 747 String colRef = objName.substring(dot + 1); 748 String resolvedTable = resolveTableRef(tableRef, select); 749 sig.addSourceRef(outputCol, new PipelinedSourceRef( 750 resolvedTable != null ? resolvedTable : tableRef, 751 SQLUtil.trimColumnStringQuote(colRef), 752 PipelinedSourceRef.SourceKind.BASE_TABLE)); 753 found = true; 754 } 755 } 756 757 // Push children 758 if (current.getLeftOperand() != null) exprStack.push(current.getLeftOperand()); 759 if (current.getRightOperand() != null) exprStack.push(current.getRightOperand()); 760 if (current.getFunctionCall() != null && current.getFunctionCall().getArgs() != null) { 761 for (int i = 0; i < current.getFunctionCall().getArgs().size(); i++) { 762 exprStack.push(current.getFunctionCall().getArgs().getExpression(i)); 763 } 764 } 765 } 766 767 if (!found) { 768 sig.addSourceRef(outputCol, new PipelinedSourceRef( 769 "expression", expr.toString(), PipelinedSourceRef.SourceKind.UNRESOLVED)); 770 } 771 } 772 773 private String resolveTableRef(String tableRef, TSelectSqlStatement select) { 774 if (select == null || select.tables == null) return null; 775 String tableRefLower = tableRef.toLowerCase(); 776 String tableRefNorm = SQLUtil.trimColumnStringQuote(tableRefLower); 777 778 for (int i = 0; i < select.tables.size(); i++) { 779 TTable t = select.tables.getTable(i); 780 // Check alias 781 if (t.getAliasClause() != null) { 782 String alias = t.getAliasClause().toString().toLowerCase(); 783 alias = SQLUtil.trimColumnStringQuote(alias); 784 if (alias.equals(tableRefNorm)) { 785 return getFullTableName(t); 786 } 787 } 788 // Check table name 789 String tName = t.getFullName() != null ? t.getFullName().toLowerCase() : ""; 790 String bareTableName = getBareName(tName); 791 if (tableRefNorm.equals(bareTableName) || tableRefNorm.equals(tName)) { 792 return getFullTableName(t); 793 } 794 } 795 796 // Check CTEs 797 if (select.getCteList() != null) { 798 for (int i = 0; i < select.getCteList().size(); i++) { 799 TCTE cte = select.getCteList().getCTE(i); 800 if (cte.getTableName() != null) { 801 String cteName = cte.getTableName().toString().toLowerCase(); 802 if (tableRefNorm.equals(cteName)) { 803 return cteName; 804 } 805 } 806 } 807 } 808 809 // If this is a subquery's select, walk up to find CTEs 810 TCustomSqlStatement parent = select.getParentStmt(); 811 if (parent instanceof TSelectSqlStatement) { 812 return resolveTableRef(tableRef, (TSelectSqlStatement) parent); 813 } 814 815 return null; 816 } 817 818 private String resolveColumnTable(String colName, TSelectSqlStatement select) { 819 if (select == null || select.tables == null) return null; 820 // Simplified: just return null to indicate we can't determine the table 821 return null; 822 } 823 824 private String getFullTableName(TTable table) { 825 if (table.getFullName() != null) { 826 return table.getFullName(); 827 } 828 return table.getName(); 829 } 830 831 private void overlayFieldAssignments(PipelinedFunctionSignature sig, 832 Map<String, TExpression> fieldAssigns, 833 Map<String, TSelectSqlStatement> loopVarBindings, 834 Map<String, TSelectSqlStatement> cursorBindings) { 835 for (Map.Entry<String, TExpression> entry : fieldAssigns.entrySet()) { 836 String fieldName = entry.getKey(); 837 TExpression expr = entry.getValue(); 838 839 // Find matching output column 840 String outputCol = null; 841 for (String col : sig.getOutputColumns()) { 842 if (col.equalsIgnoreCase(fieldName)) { 843 outputCol = col.toLowerCase(); 844 break; 845 } 846 } 847 if (outputCol == null) continue; 848 849 resolveArgLineage(sig, outputCol, expr, loopVarBindings, cursorBindings); 850 } 851 } 852 853 // ---- Phase 4: Call-site stitching ---- 854 855 /** 856 * Handle TABLE(func(...)) which is parsed as tableExpr with funcCall=null. 857 * The table name is the function name. Always defers to finalize because 858 * table columns are not yet populated during from-clause processing. 859 */ 860 public void tryStitchTableExpr(TTable table) { 861 if (!option.isEnablePipelinedStitching()) return; 862 if (table.getName() == null) return; 863 864 String tableName = table.getName(); 865 List<String> candidates = new ArrayList<>(); 866 candidates.add(normalizeKey(tableName)); 867 String bareName = getBareName(tableName); 868 if (!bareName.equals(normalizeKey(tableName))) { 869 candidates.add(bareName); 870 } 871 872 // Always defer to finalize - columns aren't populated yet during FROM processing 873 modelManager.addPendingPipelinedCallSite( 874 new PendingPipelinedCallSite(candidates, -1, table, null)); 875 } 876 877 /** 878 * Called when TABLE(func(...)) is encountered in a FROM clause. 879 * Generates candidate keys and tries to stitch immediately. 880 * If the signature isn't available yet, adds to pending list. 881 */ 882 public boolean tryStitchCallSite(TTable table, TFunctionCall funcCall) { 883 if (!option.isEnablePipelinedStitching()) return false; 884 885 List<String> candidates = generateFunctionKeyCandidates(funcCall); 886 int argCount = funcCall.getArgs() != null ? funcCall.getArgs().size() : 0; 887 888 PipelinedFunctionSignature sig = modelManager.findPipelinedSignature(candidates, argCount); 889 if (sig == null) { 890 sig = modelManager.findPipelinedSignatureAny(candidates); 891 } 892 893 if (sig != null && sig.isResolved() 894 && sig.getStatus() == PipelinedFunctionSignature.ResolutionStatus.OK) { 895 logger.trace("Pipelined stitch: found signature " + sig.getFunctionKey() 896 + " for call " + funcCall.getFunctionName() + ", cols=" + sig.getOutputColumns().size() 897 + ", lineage keys=" + sig.getLineageByOutputColumn().keySet()); 898 stitchOneCallSite(table, sig); 899 return true; 900 } 901 902 // Add to pending for finalize 903 logger.trace("Pipelined stitch: no signature found for candidates=" + candidates 904 + ", adding to pending"); 905 modelManager.addPendingPipelinedCallSite( 906 new PendingPipelinedCallSite(candidates, argCount, table, funcCall)); 907 return false; 908 } 909 910 /** 911 * Stitch a single pending call site. Called from DataFlowAnalyzer with 912 * the proper statement stack context. 913 */ 914 public void stitchOnePending(PendingPipelinedCallSite cs) { 915 if (!option.isEnablePipelinedStitching()) return; 916 917 PipelinedFunctionSignature sig = modelManager.findPipelinedSignature( 918 cs.getFunctionKeyCandidates(), cs.getArgCount()); 919 if (sig == null) { 920 sig = modelManager.findPipelinedSignatureAny(cs.getFunctionKeyCandidates()); 921 } 922 if (sig != null && sig.isResolved() 923 && sig.getStatus() == PipelinedFunctionSignature.ResolutionStatus.OK) { 924 stitchOneCallSite(cs.getTable(), sig); 925 } 926 } 927 928 /** 929 * Finalize hook: process all pending call sites. Called before XML emit. 930 */ 931 public void stitchPendingCallSites() { 932 if (!option.isEnablePipelinedStitching()) return; 933 934 for (PendingPipelinedCallSite cs : modelManager.getPendingPipelinedCallSites()) { 935 stitchOnePending(cs); 936 } 937 } 938 939 private void stitchOneCallSite(TTable table, PipelinedFunctionSignature sig) { 940 Set<String> resolving = modelManager.getResolvingPipelinedFunctions(); 941 if (resolving.contains(sig.getFunctionKey()) 942 || resolving.size() >= option.getMaxPipelinedExpansionDepth()) { 943 return; 944 } 945 946 resolving.add(sig.getFunctionKey()); 947 try { 948 // Get the table model bound to this function call table 949 Object tableModel = modelManager.getModel(table); 950 951 if (tableModel instanceof Table) { 952 Table t = (Table) tableModel; 953 if (t.getColumns() != null && !t.getColumns().isEmpty()) { 954 stitchToTable(t, sig); 955 return; 956 } 957 } 958 959 // For tableExpr tables (TABLE(func())), the table model is created 960 // differently. Try createTableFromCreateDDL which returns existing if bound. 961 Table functionTable = modelFactory.createTableFromCreateDDL(table, true); 962 if (functionTable != null && functionTable.getColumns() != null && !functionTable.getColumns().isEmpty()) { 963 stitchToTable(functionTable, sig); 964 return; 965 } 966 967 // Last resort: create output columns from the signature and stitch 968 if (functionTable != null) { 969 for (String outputCol : sig.getOutputColumns()) { 970 TObjectName colName = new TObjectName(); 971 colName.setString(outputCol); 972 TableColumn column = modelFactory.createTableColumn(functionTable, colName, true); 973 974 List<PipelinedSourceRef> refs = sig.getLineageByOutputColumn().get(outputCol.toLowerCase()); 975 if (refs == null || refs.isEmpty()) continue; 976 977 Set<String> addedSources = new HashSet<>(); 978 int added = 0; 979 for (PipelinedSourceRef ref : refs) { 980 if (added >= option.getMaxStitchedSourcesPerColumn()) break; 981 if (ref.getSourceKind() == PipelinedSourceRef.SourceKind.UNRESOLVED 982 || ref.getSourceKind() == PipelinedSourceRef.SourceKind.CONST) { 983 continue; 984 } 985 String sourceKey = ref.getParentName() + "." + ref.getColumnName(); 986 if (!addedSources.add(sourceKey)) continue; // dedup 987 988 Table sourceTable = modelFactory.createTableByName(ref.getParentName(), false); 989 TObjectName sourceColName = new TObjectName(); 990 sourceColName.setString(ref.getColumnName()); 991 TableColumn sourceCol = modelFactory.createTableColumn(sourceTable, sourceColName, true); 992 993 DataFlowRelationship relation = modelFactory.createDataFlowRelation(); 994 relation.setEffectType(EffectType.select); 995 relation.addSource(new TableColumnRelationshipElement(sourceCol)); 996 relation.setTarget(new TableColumnRelationshipElement(column)); 997 added++; 998 } 999 } 1000 } 1001 } catch (Exception e) { 1002 logger.error("Error stitching pipelined call site for " + sig.getFunctionKey(), e); 1003 } finally { 1004 resolving.remove(sig.getFunctionKey()); 1005 } 1006 } 1007 1008 private void stitchToTable(Table functionTable, PipelinedFunctionSignature sig) { 1009 if (functionTable == null || functionTable.getColumns() == null) return; 1010 1011 for (TableColumn callerCol : functionTable.getColumns()) { 1012 String callerColName = callerCol.getName(); 1013 if (callerColName == null) continue; 1014 1015 String mappedKey = mapByNameThenPosition(callerColName, callerCol, functionTable, sig); 1016 if (mappedKey == null) continue; 1017 1018 List<PipelinedSourceRef> refs = sig.getLineageByOutputColumn().get(mappedKey); 1019 if (refs == null || refs.isEmpty()) continue; 1020 1021 Set<String> addedSources = new HashSet<>(); 1022 int added = 0; 1023 for (PipelinedSourceRef ref : refs) { 1024 if (added >= option.getMaxStitchedSourcesPerColumn()) break; 1025 if (ref.getSourceKind() == PipelinedSourceRef.SourceKind.UNRESOLVED 1026 || ref.getSourceKind() == PipelinedSourceRef.SourceKind.CONST) { 1027 continue; 1028 } 1029 1030 // Dedup: skip if already added 1031 String sourceKey = ref.getParentName() + "." + ref.getColumnName(); 1032 if (!addedSources.add(sourceKey)) continue; 1033 1034 // Create the stitched relationship 1035 Table sourceTable = modelFactory.createTableByName(ref.getParentName(), false); 1036 TObjectName sourceColName = new TObjectName(); 1037 sourceColName.setString(ref.getColumnName()); 1038 TableColumn sourceCol = modelFactory.createTableColumn(sourceTable, sourceColName, true); 1039 1040 DataFlowRelationship relation = modelFactory.createDataFlowRelation(); 1041 relation.setEffectType(EffectType.select); 1042 relation.addSource(new TableColumnRelationshipElement(sourceCol)); 1043 relation.setTarget(new TableColumnRelationshipElement(callerCol)); 1044 1045 added++; 1046 } 1047 } 1048 } 1049 1050 private String mapByNameThenPosition(String callerColName, TableColumn callerCol, 1051 Table functionTable, PipelinedFunctionSignature sig) { 1052 List<String> outputColumns = sig.getOutputColumns(); 1053 if (outputColumns == null || outputColumns.isEmpty()) return null; 1054 1055 // 1. Exact name match 1056 for (String out : outputColumns) { 1057 if (out.equals(callerColName)) return out.toLowerCase(); 1058 } 1059 1060 // 2. Normalized name match (case-insensitive, strip quotes) 1061 String normalizedCaller = SQLUtil.trimColumnStringQuote(callerColName.toLowerCase()); 1062 for (String out : outputColumns) { 1063 String normalizedOut = SQLUtil.trimColumnStringQuote(out.toLowerCase()); 1064 if (normalizedOut.equals(normalizedCaller)) return normalizedOut; 1065 } 1066 1067 // 3. Position fallback (strict prerequisites) 1068 if (functionTable.getColumns() != null 1069 && functionTable.getColumns().size() == outputColumns.size()) { 1070 int idx = functionTable.getColumns().indexOf(callerCol); 1071 if (idx >= 0 && idx < outputColumns.size()) { 1072 return outputColumns.get(idx).toLowerCase(); 1073 } 1074 } 1075 1076 return null; 1077 } 1078 1079 // ---- Utility methods ---- 1080 1081 private List<String> generateFunctionKeyCandidates(TFunctionCall funcCall) { 1082 List<String> candidates = new ArrayList<>(); 1083 String funcName = funcCall.getFunctionName().toString(); 1084 1085 // Try various levels of qualification 1086 // Full name as-is 1087 candidates.add(normalizeKey(funcName)); 1088 1089 // With package prefix from current context 1090 if (ModelBindingManager.getGlobalOraclePackage() != null) { 1091 String pkgName = ModelBindingManager.getGlobalOraclePackage().getName(); 1092 if (pkgName != null && !pkgName.isEmpty()) { 1093 candidates.add(normalizeKey(pkgName + "." + funcName)); 1094 } 1095 } 1096 1097 // Bare name (strip schema/package prefix) 1098 String bareName = getBareName(funcName); 1099 if (!bareName.equals(funcName.toLowerCase())) { 1100 candidates.add(normalizeKey(bareName)); 1101 } 1102 1103 return candidates; 1104 } 1105 1106 private String normalizeTypeName(String name) { 1107 if (name == null) return ""; 1108 return DlineageUtil.getIdentifierNormalTableName(name.trim()); 1109 } 1110 1111 private String normalizeKey(String name) { 1112 if (name == null) return ""; 1113 return DlineageUtil.getIdentifierNormalTableName(name); 1114 } 1115 1116 private String getBareName(String name) { 1117 if (name == null) return ""; 1118 String normalized = DlineageUtil.getIdentifierNormalTableName(name); 1119 int lastDot = normalized.lastIndexOf('.'); 1120 if (lastDot >= 0) { 1121 return normalized.substring(lastDot + 1); 1122 } 1123 return normalized; 1124 } 1125}