001package gudusoft.gsqlparser.dlineage; 002 003import gudusoft.gsqlparser.*; 004import gudusoft.gsqlparser.dlineage.dataflow.model.*; 005import gudusoft.gsqlparser.dlineage.util.DlineageUtil; 006import gudusoft.gsqlparser.nodes.*; 007import gudusoft.gsqlparser.stmt.*; 008import gudusoft.gsqlparser.stmt.oracle.*; 009import gudusoft.gsqlparser.util.Logger; 010import gudusoft.gsqlparser.util.LoggerFactory; 011import gudusoft.gsqlparser.util.SQLUtil; 012 013import java.util.*; 014 015/** 016 * Handles Oracle pipelined table function cross-boundary data lineage stitching. 017 * 018 * <p>Two-phase approach: 019 * <ol> 020 * <li>Index phase: indexes object types, collection types, and pipelined function signatures</li> 021 * <li>Stitch phase: at call sites, appends lineage from function body to caller columns</li> 022 * </ol> 023 */ 024public class PipelinedFunctionAnalyzer { 025 026 private static final Logger logger = LoggerFactory.getLogger(PipelinedFunctionAnalyzer.class); 027 028 private final ModelBindingManager modelManager; 029 private final ModelFactory modelFactory; 030 private final Option option; 031 032 public PipelinedFunctionAnalyzer(ModelBindingManager modelManager, ModelFactory modelFactory, Option option) { 033 this.modelManager = modelManager; 034 this.modelFactory = modelFactory; 035 this.option = option; 036 } 037 038 // ---- Phase 1: Index CREATE TYPE AS OBJECT ---- 039 040 public void indexObjectType(TPlsqlCreateType stmt) { 041 if (!option.isEnablePipelinedStitching()) return; 042 if (stmt.getTypeAttributes() == null || stmt.getTypeAttributes().size() == 0) return; 043 if (stmt.getTypeName() == null) return; 044 045 String typeName = normalizeTypeName(stmt.getTypeName().toString()); 046 List<String> fieldNames = new ArrayList<>(); 047 for (int i = 0; i < stmt.getTypeAttributes().size(); i++) { 048 TTypeAttribute attr = stmt.getTypeAttributes().getAttributeItem(i); 049 if (attr.getAttributeName() != null) { 050 fieldNames.add(attr.getAttributeName().toString()); 051 } 052 } 053 modelManager.indexObjectType(typeName, fieldNames); 054 // Also index by bare name (no schema prefix) 055 String bareName = getBareName(typeName); 056 if (!bareName.equals(typeName)) { 057 modelManager.indexObjectType(bareName, fieldNames); 058 } 059 logger.trace("Indexed object type: " + typeName + " (bare=" + bareName + "), fields=" + fieldNames); 060 } 061 062 // ---- Phase 1: Index TABLE OF collection types ---- 063 064 public void indexCollectionType(TPlsqlTableTypeDefStmt stmt) { 065 if (!option.isEnablePipelinedStitching()) return; 066 if (stmt.getTypeName() == null || stmt.getElementDataType() == null) return; 067 068 String collectionName = normalizeTypeName(stmt.getTypeName().toString()); 069 String elementTypeName = normalizeTypeName(stmt.getElementDataType().toString()); 070 modelManager.indexCollectionType(collectionName, elementTypeName); 071 // Also index by bare name 072 String bareCollection = getBareName(collectionName); 073 if (!bareCollection.equals(collectionName)) { 074 modelManager.indexCollectionType(bareCollection, elementTypeName); 075 } 076 logger.trace("Indexed collection type: " + collectionName + " -> " + elementTypeName); 077 } 078 079 // ---- Phase 2: Detect pipelined function and build signature ---- 080 081 public void analyzePipelinedFunction(TPlsqlCreateFunction func) { 082 if (!option.isEnablePipelinedStitching()) return; 083 084 // Scan body for PIPE ROW statements using iterative traversal 085 boolean hasPipeRow = false; 086 List<TPlsqlPipeRowStmt> pipeRowStmts = new ArrayList<>(); 087 List<TAssignStmt> assignStmts = new ArrayList<>(); 088 List<TLoopStmt> cursorLoops = new ArrayList<>(); 089 090 Deque<TCustomSqlStatement> stack = new ArrayDeque<>(); 091 // Push body statements 092 for (int i = func.getBodyStatements().size() - 1; i >= 0; i--) { 093 stack.push(func.getBodyStatements().get(i)); 094 } 095 // Also check inner statements 096 for (int i = func.getStatements().size() - 1; i >= 0; i--) { 097 stack.push(func.getStatements().get(i)); 098 } 099 100 while (!stack.isEmpty()) { 101 TCustomSqlStatement s = stack.pop(); 102 if (s instanceof TPlsqlPipeRowStmt) { 103 pipeRowStmts.add((TPlsqlPipeRowStmt) s); 104 hasPipeRow = true; 105 } else if (s instanceof TAssignStmt) { 106 assignStmts.add((TAssignStmt) s); 107 } else if (s instanceof TLoopStmt) { 108 TLoopStmt loop = (TLoopStmt) s; 109 if (loop.getSubquery() != null || loop.getCursorName() != null) { 110 cursorLoops.add(loop); 111 } 112 // Push loop body statements 113 for (int i = loop.getBodyStatements().size() - 1; i >= 0; i--) { 114 stack.push(loop.getBodyStatements().get(i)); 115 } 116 } else if (s instanceof TIfStmt) { 117 TIfStmt ifStmt = (TIfStmt) s; 118 if (ifStmt.getThenStatements() != null) { 119 for (int i = ifStmt.getThenStatements().size() - 1; i >= 0; i--) { 120 stack.push(ifStmt.getThenStatements().get(i)); 121 } 122 } 123 if (ifStmt.getElseStatements() != null) { 124 for (int i = ifStmt.getElseStatements().size() - 1; i >= 0; i--) { 125 stack.push(ifStmt.getElseStatements().get(i)); 126 } 127 } 128 if (ifStmt.getElseifStatements() != null) { 129 for (int i = ifStmt.getElseifStatements().size() - 1; i >= 0; i--) { 130 stack.push(ifStmt.getElseifStatements().get(i)); 131 } 132 } 133 } else if (s instanceof TCommonBlock) { 134 TCommonBlock block = (TCommonBlock) s; 135 if (block.getBodyStatements() != null) { 136 for (int i = block.getBodyStatements().size() - 1; i >= 0; i--) { 137 stack.push(block.getBodyStatements().get(i)); 138 } 139 } 140 if (block.getStatements() != null) { 141 for (int i = block.getStatements().size() - 1; i >= 0; i--) { 142 stack.push(block.getStatements().get(i)); 143 } 144 } 145 } 146 // Fallback: try to push sub-statements for any other compound statement type 147 else { 148 if (s.getStatements() != null && s.getStatements().size() > 0) { 149 for (int i = s.getStatements().size() - 1; i >= 0; i--) { 150 stack.push(s.getStatements().get(i)); 151 } 152 } 153 if (s instanceof TBlockSqlStatement) { 154 TBlockSqlStatement blockStmt = (TBlockSqlStatement) s; 155 if (blockStmt.getBodyStatements() != null && blockStmt.getBodyStatements().size() > 0) { 156 for (int i = blockStmt.getBodyStatements().size() - 1; i >= 0; i--) { 157 stack.push(blockStmt.getBodyStatements().get(i)); 158 } 159 } 160 } 161 } 162 } 163 164 if (!hasPipeRow) return; 165 166 // Build signature 167 PipelinedFunctionSignature sig = new PipelinedFunctionSignature(); 168 String funcName = func.getFunctionName() != null ? func.getFunctionName().toString() : ""; 169 170 // Build function key with package context 171 String packagePrefix = ""; 172 if (ModelBindingManager.getGlobalOraclePackage() != null) { 173 packagePrefix = ModelBindingManager.getGlobalOraclePackage().getName(); 174 if (packagePrefix != null && !packagePrefix.isEmpty()) { 175 packagePrefix = packagePrefix + "."; 176 } else { 177 packagePrefix = ""; 178 } 179 } 180 181 int argCount = func.getParameterDeclarations() != null ? func.getParameterDeclarations().size() : 0; 182 183 String fullKey = normalizeKey(packagePrefix + funcName); 184 String nakedKey = normalizeKey(getBareName(funcName)); 185 186 sig.setFunctionKey(fullKey); 187 sig.setNakedKey(nakedKey); 188 sig.setArgCount(argCount); 189 190 // Resolve return type → collection type → element/object type → output columns 191 TTypeName returnType = func.getReturnDataType(); 192 logger.trace("Pipelined func " + funcName + ": returnType=" + (returnType != null ? returnType.toString() : "null")); 193 if (returnType != null) { 194 String returnTypeName = normalizeTypeName(returnType.toString()); 195 logger.trace(" returnTypeName normalized=" + returnTypeName); 196 // Try as collection type first 197 String elementType = modelManager.getElementTypeName(returnTypeName); 198 if (elementType == null) { 199 elementType = modelManager.getElementTypeName(getBareName(returnTypeName)); 200 } 201 if (elementType != null) { 202 sig.setCollectionTypeKey(returnTypeName); 203 sig.setRowTypeKey(elementType); 204 } else { 205 // Maybe the return type IS the object type directly 206 sig.setRowTypeKey(returnTypeName); 207 } 208 209 // Resolve output columns from object type index 210 String rowTypeKey = sig.getRowTypeKey(); 211 List<String> fields = null; 212 if (rowTypeKey != null) { 213 fields = modelManager.getObjectTypeFields(rowTypeKey); 214 if (fields == null) { 215 fields = modelManager.getObjectTypeFields(getBareName(rowTypeKey)); 216 } 217 } 218 if (fields != null) { 219 sig.setOutputColumns(new ArrayList<>(fields)); 220 } 221 } 222 223 // Phase 3: Extract PIPE ROW lineage 224 if (sig.getOutputColumns() != null && !sig.getOutputColumns().isEmpty()) { 225 resolvePipeRowLineage(func, sig, pipeRowStmts, assignStmts, cursorLoops); 226 } else { 227 sig.setStatus(PipelinedFunctionSignature.ResolutionStatus.UNRESOLVED); 228 } 229 230 modelManager.addPipelinedSignature(sig); 231 logger.trace("Pipelined signature built: key=" + sig.getFunctionKey() 232 + ", nakedKey=" + sig.getNakedKey() 233 + ", resolved=" + sig.isResolved() 234 + ", status=" + sig.getStatus() 235 + ", outputCols=" + sig.getOutputColumns() 236 + ", lineageKeys=" + sig.getLineageByOutputColumn().keySet() 237 + ", lineageEntries=" + sig.getLineageByOutputColumn().size()); 238 for (Map.Entry<String, List<PipelinedSourceRef>> entry : sig.getLineageByOutputColumn().entrySet()) { 239 logger.trace(" " + entry.getKey() + " <- " + entry.getValue()); 240 } 241 } 242 243 // ---- Phase 3: PIPE ROW lineage extraction ---- 244 245 private void resolvePipeRowLineage(TPlsqlCreateFunction func, PipelinedFunctionSignature sig, 246 List<TPlsqlPipeRowStmt> pipeRowStmts, 247 List<TAssignStmt> assignStmts, 248 List<TLoopStmt> cursorLoops) { 249 // Build cursor bindings from explicit CURSOR declarations 250 // Check both declare section (getStatements) and body (getBodyStatements) 251 Map<String, TSelectSqlStatement> cursorBindings = new LinkedHashMap<>(); 252 for (int i = 0; i < func.getStatements().size(); i++) { 253 TCustomSqlStatement s = func.getStatements().get(i); 254 if (s instanceof TCursorDeclStmt) { 255 TCursorDeclStmt cursorDecl = (TCursorDeclStmt) s; 256 if (cursorDecl.getCursorName() != null && cursorDecl.getSubquery() != null) { 257 cursorBindings.put(cursorDecl.getCursorName().toString().toLowerCase(), cursorDecl.getSubquery()); 258 } 259 } 260 } 261 for (int i = 0; i < func.getBodyStatements().size(); i++) { 262 TCustomSqlStatement s = func.getBodyStatements().get(i); 263 if (s instanceof TCursorDeclStmt) { 264 TCursorDeclStmt cursorDecl = (TCursorDeclStmt) s; 265 if (cursorDecl.getCursorName() != null && cursorDecl.getSubquery() != null) { 266 cursorBindings.put(cursorDecl.getCursorName().toString().toLowerCase(), cursorDecl.getSubquery()); 267 } 268 } 269 } 270 271 // Build loop variable bindings: loopVar -> cursor SELECT 272 Map<String, TSelectSqlStatement> loopVarBindings = new LinkedHashMap<>(); 273 for (TLoopStmt loop : cursorLoops) { 274 String loopVar = null; 275 if (loop.getRecordName() != null) { 276 loopVar = loop.getRecordName().toString().toLowerCase(); 277 } else if (loop.getIndexName() != null) { 278 loopVar = loop.getIndexName().toString().toLowerCase(); 279 } 280 if (loopVar == null) continue; 281 282 if (loop.getSubquery() != null) { 283 // Inline SELECT: FOR item IN (SELECT ...) LOOP 284 loopVarBindings.put(loopVar, loop.getSubquery()); 285 } else if (loop.getCursorName() != null) { 286 // Named cursor: FOR item IN cursorName LOOP 287 String cursorName = loop.getCursorName().toString().toLowerCase(); 288 TSelectSqlStatement cursorSelect = cursorBindings.get(cursorName); 289 if (cursorSelect != null) { 290 loopVarBindings.put(loopVar, cursorSelect); 291 } 292 logger.trace("Named cursor loop: var=" + loopVar + ", cursor=" + cursorName 293 + ", resolved=" + (cursorSelect != null)); 294 } 295 } 296 297 // Build assignment maps 298 // outRow := ROW_TYPE(...) --> variable -> list of constructor expressions (union from multiple branches) 299 Map<String, List<TExpression>> varConstructors = new LinkedHashMap<>(); 300 // outRow.field := expr --> variable -> field -> expression 301 Map<String, Map<String, TExpression>> varFieldAssignments = new LinkedHashMap<>(); 302 303 for (TAssignStmt assign : assignStmts) { 304 TExpression leftExpr = assign.getLeft(); 305 TExpression rightExpr = assign.getExpression(); 306 if (leftExpr == null || rightExpr == null) continue; 307 308 String leftText = leftExpr.toString().trim(); 309 int dotIdx = leftText.indexOf('.'); 310 if (dotIdx > 0) { 311 // outRow.field := expr 312 String varName = leftText.substring(0, dotIdx).toLowerCase(); 313 String fieldName = leftText.substring(dotIdx + 1); 314 varFieldAssignments.computeIfAbsent(varName, k -> new LinkedHashMap<>()) 315 .put(fieldName, rightExpr); 316 } else { 317 // outRow := ROW_TYPE(...) 318 String varName = leftText.toLowerCase(); 319 varConstructors.computeIfAbsent(varName, k -> new ArrayList<>()) 320 .add(rightExpr); 321 } 322 } 323 324 // Check for cursor FOR loops using named cursors (FOR item IN cursorName LOOP) 325 for (TLoopStmt loop : cursorLoops) { 326 if (loop.getIndexName() != null && loop.getSubquery() == null) { 327 // Might reference a named cursor 328 String loopVar = loop.getIndexName().toString().toLowerCase(); 329 // Look for cursor name in the loop definition 330 // The cursor name would be in the loop's cursor expression 331 // This is handled differently - the subquery is already resolved 332 } 333 } 334 335 // Process each PIPE ROW statement 336 for (TPlsqlPipeRowStmt pipeRow : pipeRowStmts) { 337 TExpression expr = pipeRow.getExpression(); 338 if (expr == null) continue; 339 340 String exprText = expr.toString().trim(); 341 342 if (isObjectConstructor(expr)) { 343 // PIPE ROW(ROW_TYPE(arg1, arg2, ...)) 344 bindConstructorArgs(sig, expr, loopVarBindings, cursorBindings); 345 } else { 346 // PIPE ROW(outRow) - variable reference 347 String varName = exprText.toLowerCase(); 348 349 // Check for constructor assignments: outRow := ROW_TYPE(...) 350 List<TExpression> constructors = varConstructors.get(varName); 351 if (constructors != null) { 352 for (TExpression constructor : constructors) { 353 if (isObjectConstructor(constructor)) { 354 bindConstructorArgs(sig, constructor, loopVarBindings, cursorBindings); 355 } 356 } 357 } 358 359 // Check for field assignments: outRow.field := expr 360 Map<String, TExpression> fieldAssigns = varFieldAssignments.get(varName); 361 if (fieldAssigns != null) { 362 overlayFieldAssignments(sig, fieldAssigns, loopVarBindings, cursorBindings); 363 } 364 } 365 } 366 367 sig.setResolved(true); 368 sig.setStatus(PipelinedFunctionSignature.ResolutionStatus.OK); 369 } 370 371 private boolean isObjectConstructor(TExpression expr) { 372 if (expr == null) return false; 373 // A constructor call looks like a function call: ROW_TYPE(arg1, arg2, ...) 374 if (expr.getExpressionType() == EExpressionType.function_t && expr.getFunctionCall() != null) { 375 return true; 376 } 377 // Also handle simple_object_expr_t 378 if (expr.getExpressionType() == EExpressionType.simple_object_name_t) { 379 return false; 380 } 381 // Check if it's a function call expression 382 String text = expr.toString().trim(); 383 if (text.contains("(") && text.endsWith(")")) { 384 return expr.getFunctionCall() != null; 385 } 386 return false; 387 } 388 389 private void bindConstructorArgs(PipelinedFunctionSignature sig, TExpression constructorExpr, 390 Map<String, TSelectSqlStatement> loopVarBindings, 391 Map<String, TSelectSqlStatement> cursorBindings) { 392 TFunctionCall funcCall = constructorExpr.getFunctionCall(); 393 if (funcCall == null || funcCall.getArgs() == null) return; 394 395 TExpressionList args = funcCall.getArgs(); 396 List<String> outputColumns = sig.getOutputColumns(); 397 398 for (int i = 0; i < args.size() && i < outputColumns.size(); i++) { 399 TExpression arg = args.getExpression(i); 400 String outputCol = outputColumns.get(i).toLowerCase(); 401 resolveArgLineage(sig, outputCol, arg, loopVarBindings, cursorBindings); 402 } 403 } 404 405 private void resolveArgLineage(PipelinedFunctionSignature sig, String outputCol, 406 TExpression arg, 407 Map<String, TSelectSqlStatement> loopVarBindings, 408 Map<String, TSelectSqlStatement> cursorBindings) { 409 if (arg == null) return; 410 411 String argText = arg.toString().trim(); 412 413 // Check if it's a wrapper function like util.TO_NUMBER(item.xxx) - extract inner argument 414 TExpression innerArg = unwrapFunctionCalls(arg); 415 if (innerArg != arg) { 416 argText = innerArg.toString().trim(); 417 } 418 419 // Check for loop variable reference: item.COLUMN_NAME 420 int dotIdx = argText.indexOf('.'); 421 if (dotIdx > 0) { 422 String prefix = argText.substring(0, dotIdx).toLowerCase(); 423 String fieldName = argText.substring(dotIdx + 1); 424 425 // Check loop variable bindings 426 TSelectSqlStatement cursorSelect = loopVarBindings.get(prefix); 427 if (cursorSelect == null) { 428 cursorSelect = cursorBindings.get(prefix); 429 } 430 if (cursorSelect != null) { 431 // Resolve the field from the cursor's select list 432 resolveFromCursorSelect(sig, outputCol, fieldName, cursorSelect); 433 return; 434 } 435 } 436 437 // Check for simple column reference (no prefix) 438 if (dotIdx < 0 && !argText.contains("(")) { 439 // Could be a local variable or parameter - mark as unresolved for now 440 sig.addSourceRef(outputCol, new PipelinedSourceRef( 441 "unknown", argText, PipelinedSourceRef.SourceKind.UNRESOLVED)); 442 return; 443 } 444 445 // Check for literal/constant 446 if (argText.equals("null") || argText.startsWith("'") || argText.matches("-?\\d+.*")) { 447 // Constant - empty source, just preserve column structure 448 return; 449 } 450 451 // For qualified names like table.column that aren't loop variables 452 if (dotIdx > 0) { 453 String tablePart = argText.substring(0, dotIdx); 454 String colPart = argText.substring(dotIdx + 1); 455 sig.addSourceRef(outputCol, new PipelinedSourceRef( 456 tablePart, colPart, PipelinedSourceRef.SourceKind.BASE_TABLE)); 457 } 458 } 459 460 private TExpression unwrapFunctionCalls(TExpression expr) { 461 // Unwrap wrapper functions like TO_NUMBER(x), TO_CHAR(x), TRIM(x), util.TO_NUMBER(x) 462 if (expr.getExpressionType() == EExpressionType.function_t && expr.getFunctionCall() != null) { 463 TFunctionCall func = expr.getFunctionCall(); 464 if (func.getArgs() != null && func.getArgs().size() == 1) { 465 String funcName = func.getFunctionName().toString().toUpperCase(); 466 // Common wrapper functions 467 if (funcName.endsWith("TO_NUMBER") || funcName.endsWith("TO_CHAR") 468 || funcName.endsWith("TO_DATE") || funcName.equals("TRIM") 469 || funcName.equals("NVL") || funcName.equals("COALESCE") 470 || funcName.equals("UPPER") || funcName.equals("LOWER") 471 || funcName.equals("CAST") || funcName.endsWith(".TO_NUMBER")) { 472 return unwrapFunctionCalls(func.getArgs().getExpression(0)); 473 } 474 } 475 } 476 return expr; 477 } 478 479 private void resolveFromCursorSelect(PipelinedFunctionSignature sig, String outputCol, 480 String fieldName, TSelectSqlStatement cursorSelect) { 481 // Find the column in the cursor's SELECT list that matches fieldName 482 if (cursorSelect.getResultColumnList() == null) return; 483 484 String fieldNameLower = fieldName.toLowerCase(); 485 // Strip quotes 486 fieldNameLower = SQLUtil.trimColumnStringQuote(fieldNameLower); 487 488 for (int i = 0; i < cursorSelect.getResultColumnList().size(); i++) { 489 TResultColumn rc = cursorSelect.getResultColumnList().getResultColumn(i); 490 String alias = null; 491 if (rc.getAliasClause() != null) { 492 alias = rc.getAliasClause().toString().toLowerCase(); 493 alias = SQLUtil.trimColumnStringQuote(alias); 494 } 495 String colName = null; 496 if (rc.getExpr() != null) { 497 colName = rc.getExpr().toString().toLowerCase(); 498 // Extract just the column part from table.column 499 int dot = colName.lastIndexOf('.'); 500 if (dot >= 0) { 501 colName = colName.substring(dot + 1); 502 } 503 colName = SQLUtil.trimColumnStringQuote(colName); 504 } 505 506 if (fieldNameLower.equals(alias) || fieldNameLower.equals(colName)) { 507 // Found the matching column - trace its source 508 resolveResultColumnSources(sig, outputCol, rc, cursorSelect); 509 return; 510 } 511 } 512 513 // Didn't find by name, try position if same number of columns 514 sig.addSourceRef(outputCol, new PipelinedSourceRef( 515 "cursor", fieldName, PipelinedSourceRef.SourceKind.UNRESOLVED)); 516 } 517 518 private void resolveResultColumnSources(PipelinedFunctionSignature sig, String outputCol, 519 TResultColumn rc, TSelectSqlStatement select) { 520 if (rc.getExpr() == null) return; 521 522 TExpression expr = rc.getExpr(); 523 // Unwrap wrapper functions 524 expr = unwrapFunctionCalls(expr); 525 String exprText = expr.toString().trim(); 526 527 // Try to resolve to a base table column 528 int dotIdx = exprText.lastIndexOf('.'); 529 if (dotIdx > 0) { 530 String tableRef = exprText.substring(0, dotIdx); 531 String colRef = exprText.substring(dotIdx + 1); 532 533 // Resolve table reference against FROM clause 534 String resolvedTable = resolveTableRef(tableRef, select); 535 if (resolvedTable != null) { 536 sig.addSourceRef(outputCol, new PipelinedSourceRef( 537 resolvedTable, SQLUtil.trimColumnStringQuote(colRef), 538 PipelinedSourceRef.SourceKind.BASE_TABLE)); 539 return; 540 } 541 // Fallback: use the table ref as-is 542 sig.addSourceRef(outputCol, new PipelinedSourceRef( 543 tableRef, SQLUtil.trimColumnStringQuote(colRef), 544 PipelinedSourceRef.SourceKind.BASE_TABLE)); 545 } else if (exprText.equals("null") || exprText.startsWith("'") || exprText.matches("-?\\d+.*")) { 546 // Constant - just preserve column structure 547 sig.addSourceRef(outputCol, new PipelinedSourceRef( 548 "constant", exprText, PipelinedSourceRef.SourceKind.CONST)); 549 } else if (!exprText.contains("(") && !exprText.contains(" ")) { 550 // Simple column name without table prefix 551 // Try to resolve by checking sourceTable from the parser's column resolution 552 if (rc.getExpr().getObjectOperand() != null 553 && rc.getExpr().getObjectOperand().getSourceTable() != null) { 554 TTable sourceTable = rc.getExpr().getObjectOperand().getSourceTable(); 555 String tableName = getFullTableName(sourceTable); 556 sig.addSourceRef(outputCol, new PipelinedSourceRef( 557 tableName, SQLUtil.trimColumnStringQuote(exprText), 558 PipelinedSourceRef.SourceKind.BASE_TABLE)); 559 } else { 560 // Try to find which table it belongs to from the FROM clause 561 String resolvedTable = resolveColumnTable(exprText, select); 562 if (resolvedTable != null) { 563 sig.addSourceRef(outputCol, new PipelinedSourceRef( 564 resolvedTable, SQLUtil.trimColumnStringQuote(exprText), 565 PipelinedSourceRef.SourceKind.BASE_TABLE)); 566 } else { 567 // Fallback: use the SELECT itself as the source - the analyzer will trace further 568 // Use the cursor/select name as a proxy source 569 String selectName = "cursor_result"; 570 if (select.tables != null && select.tables.size() > 0) { 571 selectName = getFullTableName(select.tables.getTable(0)); 572 } 573 sig.addSourceRef(outputCol, new PipelinedSourceRef( 574 selectName, SQLUtil.trimColumnStringQuote(exprText), 575 PipelinedSourceRef.SourceKind.CTE)); 576 } 577 } 578 } else { 579 // Complex expression - try extracting column refs 580 extractColumnRefsFromExpr(sig, outputCol, expr, select); 581 } 582 } 583 584 private void extractColumnRefsFromExpr(PipelinedFunctionSignature sig, String outputCol, 585 TExpression expr, TSelectSqlStatement select) { 586 // Use iterative traversal to find object names in the expression 587 Deque<TExpression> exprStack = new ArrayDeque<>(); 588 exprStack.push(expr); 589 boolean found = false; 590 591 while (!exprStack.isEmpty()) { 592 TExpression current = exprStack.pop(); 593 if (current == null) continue; 594 595 if (current.getExpressionType() == EExpressionType.simple_object_name_t 596 && current.getObjectOperand() != null) { 597 String objName = current.getObjectOperand().toString(); 598 int dot = objName.lastIndexOf('.'); 599 if (dot > 0) { 600 String tableRef = objName.substring(0, dot); 601 String colRef = objName.substring(dot + 1); 602 String resolvedTable = resolveTableRef(tableRef, select); 603 sig.addSourceRef(outputCol, new PipelinedSourceRef( 604 resolvedTable != null ? resolvedTable : tableRef, 605 SQLUtil.trimColumnStringQuote(colRef), 606 PipelinedSourceRef.SourceKind.BASE_TABLE)); 607 found = true; 608 } 609 } 610 611 // Push children 612 if (current.getLeftOperand() != null) exprStack.push(current.getLeftOperand()); 613 if (current.getRightOperand() != null) exprStack.push(current.getRightOperand()); 614 if (current.getFunctionCall() != null && current.getFunctionCall().getArgs() != null) { 615 for (int i = 0; i < current.getFunctionCall().getArgs().size(); i++) { 616 exprStack.push(current.getFunctionCall().getArgs().getExpression(i)); 617 } 618 } 619 } 620 621 if (!found) { 622 sig.addSourceRef(outputCol, new PipelinedSourceRef( 623 "expression", expr.toString(), PipelinedSourceRef.SourceKind.UNRESOLVED)); 624 } 625 } 626 627 private String resolveTableRef(String tableRef, TSelectSqlStatement select) { 628 if (select == null || select.tables == null) return null; 629 String tableRefLower = tableRef.toLowerCase(); 630 String tableRefNorm = SQLUtil.trimColumnStringQuote(tableRefLower); 631 632 for (int i = 0; i < select.tables.size(); i++) { 633 TTable t = select.tables.getTable(i); 634 // Check alias 635 if (t.getAliasClause() != null) { 636 String alias = t.getAliasClause().toString().toLowerCase(); 637 alias = SQLUtil.trimColumnStringQuote(alias); 638 if (alias.equals(tableRefNorm)) { 639 return getFullTableName(t); 640 } 641 } 642 // Check table name 643 String tName = t.getFullName() != null ? t.getFullName().toLowerCase() : ""; 644 String bareTableName = getBareName(tName); 645 if (tableRefNorm.equals(bareTableName) || tableRefNorm.equals(tName)) { 646 return getFullTableName(t); 647 } 648 } 649 650 // Check CTEs 651 if (select.getCteList() != null) { 652 for (int i = 0; i < select.getCteList().size(); i++) { 653 TCTE cte = select.getCteList().getCTE(i); 654 if (cte.getTableName() != null) { 655 String cteName = cte.getTableName().toString().toLowerCase(); 656 if (tableRefNorm.equals(cteName)) { 657 return cteName; 658 } 659 } 660 } 661 } 662 663 // If this is a subquery's select, walk up to find CTEs 664 TCustomSqlStatement parent = select.getParentStmt(); 665 if (parent instanceof TSelectSqlStatement) { 666 return resolveTableRef(tableRef, (TSelectSqlStatement) parent); 667 } 668 669 return null; 670 } 671 672 private String resolveColumnTable(String colName, TSelectSqlStatement select) { 673 if (select == null || select.tables == null) return null; 674 // Simplified: just return null to indicate we can't determine the table 675 return null; 676 } 677 678 private String getFullTableName(TTable table) { 679 if (table.getFullName() != null) { 680 return table.getFullName(); 681 } 682 return table.getName(); 683 } 684 685 private void overlayFieldAssignments(PipelinedFunctionSignature sig, 686 Map<String, TExpression> fieldAssigns, 687 Map<String, TSelectSqlStatement> loopVarBindings, 688 Map<String, TSelectSqlStatement> cursorBindings) { 689 for (Map.Entry<String, TExpression> entry : fieldAssigns.entrySet()) { 690 String fieldName = entry.getKey(); 691 TExpression expr = entry.getValue(); 692 693 // Find matching output column 694 String outputCol = null; 695 for (String col : sig.getOutputColumns()) { 696 if (col.equalsIgnoreCase(fieldName)) { 697 outputCol = col.toLowerCase(); 698 break; 699 } 700 } 701 if (outputCol == null) continue; 702 703 resolveArgLineage(sig, outputCol, expr, loopVarBindings, cursorBindings); 704 } 705 } 706 707 // ---- Phase 4: Call-site stitching ---- 708 709 /** 710 * Handle TABLE(func(...)) which is parsed as tableExpr with funcCall=null. 711 * The table name is the function name. Always defers to finalize because 712 * table columns are not yet populated during from-clause processing. 713 */ 714 public void tryStitchTableExpr(TTable table) { 715 if (!option.isEnablePipelinedStitching()) return; 716 if (table.getName() == null) return; 717 718 String tableName = table.getName(); 719 List<String> candidates = new ArrayList<>(); 720 candidates.add(normalizeKey(tableName)); 721 String bareName = getBareName(tableName); 722 if (!bareName.equals(normalizeKey(tableName))) { 723 candidates.add(bareName); 724 } 725 726 // Always defer to finalize - columns aren't populated yet during FROM processing 727 modelManager.addPendingPipelinedCallSite( 728 new PendingPipelinedCallSite(candidates, -1, table, null)); 729 } 730 731 /** 732 * Called when TABLE(func(...)) is encountered in a FROM clause. 733 * Generates candidate keys and tries to stitch immediately. 734 * If the signature isn't available yet, adds to pending list. 735 */ 736 public boolean tryStitchCallSite(TTable table, TFunctionCall funcCall) { 737 if (!option.isEnablePipelinedStitching()) return false; 738 739 List<String> candidates = generateFunctionKeyCandidates(funcCall); 740 int argCount = funcCall.getArgs() != null ? funcCall.getArgs().size() : 0; 741 742 PipelinedFunctionSignature sig = modelManager.findPipelinedSignature(candidates, argCount); 743 if (sig == null) { 744 sig = modelManager.findPipelinedSignatureAny(candidates); 745 } 746 747 if (sig != null && sig.isResolved() 748 && sig.getStatus() == PipelinedFunctionSignature.ResolutionStatus.OK) { 749 logger.trace("Pipelined stitch: found signature " + sig.getFunctionKey() 750 + " for call " + funcCall.getFunctionName() + ", cols=" + sig.getOutputColumns().size() 751 + ", lineage keys=" + sig.getLineageByOutputColumn().keySet()); 752 stitchOneCallSite(table, sig); 753 return true; 754 } 755 756 // Add to pending for finalize 757 logger.trace("Pipelined stitch: no signature found for candidates=" + candidates 758 + ", adding to pending"); 759 modelManager.addPendingPipelinedCallSite( 760 new PendingPipelinedCallSite(candidates, argCount, table, funcCall)); 761 return false; 762 } 763 764 /** 765 * Stitch a single pending call site. Called from DataFlowAnalyzer with 766 * the proper statement stack context. 767 */ 768 public void stitchOnePending(PendingPipelinedCallSite cs) { 769 if (!option.isEnablePipelinedStitching()) return; 770 771 PipelinedFunctionSignature sig = modelManager.findPipelinedSignature( 772 cs.getFunctionKeyCandidates(), cs.getArgCount()); 773 if (sig == null) { 774 sig = modelManager.findPipelinedSignatureAny(cs.getFunctionKeyCandidates()); 775 } 776 if (sig != null && sig.isResolved() 777 && sig.getStatus() == PipelinedFunctionSignature.ResolutionStatus.OK) { 778 stitchOneCallSite(cs.getTable(), sig); 779 } 780 } 781 782 /** 783 * Finalize hook: process all pending call sites. Called before XML emit. 784 */ 785 public void stitchPendingCallSites() { 786 if (!option.isEnablePipelinedStitching()) return; 787 788 for (PendingPipelinedCallSite cs : modelManager.getPendingPipelinedCallSites()) { 789 stitchOnePending(cs); 790 } 791 } 792 793 private void stitchOneCallSite(TTable table, PipelinedFunctionSignature sig) { 794 Set<String> resolving = modelManager.getResolvingPipelinedFunctions(); 795 if (resolving.contains(sig.getFunctionKey()) 796 || resolving.size() >= option.getMaxPipelinedExpansionDepth()) { 797 return; 798 } 799 800 resolving.add(sig.getFunctionKey()); 801 try { 802 // Get the table model bound to this function call table 803 Object tableModel = modelManager.getModel(table); 804 805 if (tableModel instanceof Table) { 806 Table t = (Table) tableModel; 807 if (t.getColumns() != null && !t.getColumns().isEmpty()) { 808 stitchToTable(t, sig); 809 return; 810 } 811 } 812 813 // For tableExpr tables (TABLE(func())), the table model is created 814 // differently. Try createTableFromCreateDDL which returns existing if bound. 815 Table functionTable = modelFactory.createTableFromCreateDDL(table, true); 816 if (functionTable != null && functionTable.getColumns() != null && !functionTable.getColumns().isEmpty()) { 817 stitchToTable(functionTable, sig); 818 return; 819 } 820 821 // Last resort: create output columns from the signature and stitch 822 if (functionTable != null) { 823 for (String outputCol : sig.getOutputColumns()) { 824 TObjectName colName = new TObjectName(); 825 colName.setString(outputCol); 826 TableColumn column = modelFactory.createTableColumn(functionTable, colName, true); 827 828 List<PipelinedSourceRef> refs = sig.getLineageByOutputColumn().get(outputCol.toLowerCase()); 829 if (refs == null || refs.isEmpty()) continue; 830 831 Set<String> addedSources = new HashSet<>(); 832 int added = 0; 833 for (PipelinedSourceRef ref : refs) { 834 if (added >= option.getMaxStitchedSourcesPerColumn()) break; 835 if (ref.getSourceKind() == PipelinedSourceRef.SourceKind.UNRESOLVED 836 || ref.getSourceKind() == PipelinedSourceRef.SourceKind.CONST) { 837 continue; 838 } 839 String sourceKey = ref.getParentName() + "." + ref.getColumnName(); 840 if (!addedSources.add(sourceKey)) continue; // dedup 841 842 Table sourceTable = modelFactory.createTableByName(ref.getParentName(), false); 843 TObjectName sourceColName = new TObjectName(); 844 sourceColName.setString(ref.getColumnName()); 845 TableColumn sourceCol = modelFactory.createTableColumn(sourceTable, sourceColName, true); 846 847 DataFlowRelationship relation = modelFactory.createDataFlowRelation(); 848 relation.setEffectType(EffectType.select); 849 relation.addSource(new TableColumnRelationshipElement(sourceCol)); 850 relation.setTarget(new TableColumnRelationshipElement(column)); 851 added++; 852 } 853 } 854 } 855 } catch (Exception e) { 856 logger.error("Error stitching pipelined call site for " + sig.getFunctionKey(), e); 857 } finally { 858 resolving.remove(sig.getFunctionKey()); 859 } 860 } 861 862 private void stitchToTable(Table functionTable, PipelinedFunctionSignature sig) { 863 if (functionTable == null || functionTable.getColumns() == null) return; 864 865 for (TableColumn callerCol : functionTable.getColumns()) { 866 String callerColName = callerCol.getName(); 867 if (callerColName == null) continue; 868 869 String mappedKey = mapByNameThenPosition(callerColName, callerCol, functionTable, sig); 870 if (mappedKey == null) continue; 871 872 List<PipelinedSourceRef> refs = sig.getLineageByOutputColumn().get(mappedKey); 873 if (refs == null || refs.isEmpty()) continue; 874 875 Set<String> addedSources = new HashSet<>(); 876 int added = 0; 877 for (PipelinedSourceRef ref : refs) { 878 if (added >= option.getMaxStitchedSourcesPerColumn()) break; 879 if (ref.getSourceKind() == PipelinedSourceRef.SourceKind.UNRESOLVED 880 || ref.getSourceKind() == PipelinedSourceRef.SourceKind.CONST) { 881 continue; 882 } 883 884 // Dedup: skip if already added 885 String sourceKey = ref.getParentName() + "." + ref.getColumnName(); 886 if (!addedSources.add(sourceKey)) continue; 887 888 // Create the stitched relationship 889 Table sourceTable = modelFactory.createTableByName(ref.getParentName(), false); 890 TObjectName sourceColName = new TObjectName(); 891 sourceColName.setString(ref.getColumnName()); 892 TableColumn sourceCol = modelFactory.createTableColumn(sourceTable, sourceColName, true); 893 894 DataFlowRelationship relation = modelFactory.createDataFlowRelation(); 895 relation.setEffectType(EffectType.select); 896 relation.addSource(new TableColumnRelationshipElement(sourceCol)); 897 relation.setTarget(new TableColumnRelationshipElement(callerCol)); 898 899 added++; 900 } 901 } 902 } 903 904 private String mapByNameThenPosition(String callerColName, TableColumn callerCol, 905 Table functionTable, PipelinedFunctionSignature sig) { 906 List<String> outputColumns = sig.getOutputColumns(); 907 if (outputColumns == null || outputColumns.isEmpty()) return null; 908 909 // 1. Exact name match 910 for (String out : outputColumns) { 911 if (out.equals(callerColName)) return out.toLowerCase(); 912 } 913 914 // 2. Normalized name match (case-insensitive, strip quotes) 915 String normalizedCaller = SQLUtil.trimColumnStringQuote(callerColName.toLowerCase()); 916 for (String out : outputColumns) { 917 String normalizedOut = SQLUtil.trimColumnStringQuote(out.toLowerCase()); 918 if (normalizedOut.equals(normalizedCaller)) return normalizedOut; 919 } 920 921 // 3. Position fallback (strict prerequisites) 922 if (functionTable.getColumns() != null 923 && functionTable.getColumns().size() == outputColumns.size()) { 924 int idx = functionTable.getColumns().indexOf(callerCol); 925 if (idx >= 0 && idx < outputColumns.size()) { 926 return outputColumns.get(idx).toLowerCase(); 927 } 928 } 929 930 return null; 931 } 932 933 // ---- Utility methods ---- 934 935 private List<String> generateFunctionKeyCandidates(TFunctionCall funcCall) { 936 List<String> candidates = new ArrayList<>(); 937 String funcName = funcCall.getFunctionName().toString(); 938 939 // Try various levels of qualification 940 // Full name as-is 941 candidates.add(normalizeKey(funcName)); 942 943 // With package prefix from current context 944 if (ModelBindingManager.getGlobalOraclePackage() != null) { 945 String pkgName = ModelBindingManager.getGlobalOraclePackage().getName(); 946 if (pkgName != null && !pkgName.isEmpty()) { 947 candidates.add(normalizeKey(pkgName + "." + funcName)); 948 } 949 } 950 951 // Bare name (strip schema/package prefix) 952 String bareName = getBareName(funcName); 953 if (!bareName.equals(funcName.toLowerCase())) { 954 candidates.add(normalizeKey(bareName)); 955 } 956 957 return candidates; 958 } 959 960 private String normalizeTypeName(String name) { 961 if (name == null) return ""; 962 return DlineageUtil.getIdentifierNormalTableName(name.trim()); 963 } 964 965 private String normalizeKey(String name) { 966 if (name == null) return ""; 967 return DlineageUtil.getIdentifierNormalTableName(name); 968 } 969 970 private String getBareName(String name) { 971 if (name == null) return ""; 972 String normalized = DlineageUtil.getIdentifierNormalTableName(name); 973 int lastDot = normalized.lastIndexOf('.'); 974 if (lastDot >= 0) { 975 return normalized.substring(lastDot + 1); 976 } 977 return normalized; 978 } 979}