From 01bcba78b41d4adb0f9ece6c2a6316411a156a86 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Wed, 22 Apr 2026 20:10:36 -0400 Subject: [PATCH] Snap staging DDL on module create/edit/run; allowlist benign jrunner exception. Staging table drift caused silent data loss when dest grew columns but staging kept the old shape. Fix on three fronts: - Runner now DROP+CREATEs staging each run instead of CREATE IF NOT EXISTS, so any drift self-heals. - Wizard create drop+creates staging right after dest is provisioned, surfacing DDL errors at create time. - Module edit drops the (old-name) staging table and re-applies COMMENT ON TABLE when dest_description changed. jrunner's query mode uses executeQuery() which raises "No results were returned by the query" after DDL/DML succeeds; the stack-trace detector now allowlists that exception so normal CREATE/TRUNCATE/INSERT runs aren't flagged as failures. Co-Authored-By: Claude Opus 4.7 --- pipekit/engine/runner.py | 11 +++++--- pipekit/jrunner.py | 15 ++++++++-- pipekit/web/app.py | 39 ++++++++++++++++++++++++-- pipekit/web/templates/module_form.html | 4 +-- 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index e957772..b21562c 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -91,16 +91,19 @@ def run_module(module_id: int, *, group_run_id: int | None = None, status = "success" return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql) - # 4. ensure staging table exists on dest. Mirror the real dest schema - # so jrunner's auto-DELETE and the subsequent merge INSERT both find - # a table to work on. Idempotent — no-op after first run. + # 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so + # any drift — dest columns added since staging was last made — is + # self-healing. Staging is ephemeral per SPEC; nothing of value lives + # between runs. staging_schema, _, _ = module["staging_table"].partition(".") if staging_schema and staging_schema != module["staging_table"]: jrunner.run_dest_sql( dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};") + jrunner.run_dest_sql( + dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") jrunner.run_dest_sql( dest_conn, - f"CREATE TABLE IF NOT EXISTS {module['staging_table']} " + f"CREATE TABLE {module['staging_table']} " f"(LIKE {module['dest_table']} INCLUDING ALL);", ) diff --git a/pipekit/jrunner.py b/pipekit/jrunner.py index 2fc9d3e..4bcdb89 100644 --- a/pipekit/jrunner.py +++ b/pipekit/jrunner.py @@ -216,6 +216,14 @@ _STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M) _EXCEPTION_HEADER_RE = re.compile( r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M) +# jrunner runs query-mode SQL with `executeQuery`, which requires the +# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT) +# still executes, but PG then throws "No results were returned by the +# query." The statement succeeded — ignore the trace. +_BENIGN_EXCEPTION_SUBSTRINGS = ( + "No results were returned by the query", +) + def _detect_silent_failure(stdout: str, stderr: str) -> str | None: """Return a short error summary if jrunner exited 0 but logged a failure.""" @@ -223,9 +231,10 @@ def _detect_silent_failure(stdout: str, stderr: str) -> str | None: if not _STACK_FRAME_RE.search(combined): return None m = _EXCEPTION_HEADER_RE.search(combined) - if m: - return m.group(0).strip() - return "jrunner logged a Java stack trace but exited 0" + header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0" + if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS): + return None + return header class JrunnerError(RuntimeError): diff --git a/pipekit/web/app.py b/pipekit/web/app.py index e789284..e6a4905 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -162,17 +162,36 @@ async def module_update(request: Request, module_id: int): if merge_strategy not in ("full", "incremental", "append"): raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}") + new_staging = form["staging_table"].strip() + new_description = (form.get("dest_description") or "").strip() or None + + # DDL snap: drop the staging table under its old name (runner recreates + # fresh from dest on next run). Re-apply dest COMMENT if it changed. + dest_conn = repo.get_connection(module["dest_connection_id"]) + if dest_conn is not None: + try: + jrunner.run_dest_sql( + dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") + if new_description != module["dest_description"]: + jrunner.run_dest_sql( + dest_conn, + f"COMMENT ON TABLE {module['dest_table']} IS " + f"{_sql_str(new_description or '')};", + ) + except jrunner.JrunnerError as e: + raise HTTPException(500, f"dest DDL snap failed: {e}") + repo.update_module( module_id, name=new_name, source_connection_id=int(form["source_connection_id"]), dest_connection_id=int(form["dest_connection_id"]), dest_table=form["dest_table"].strip(), - staging_table=form["staging_table"].strip(), + staging_table=new_staging, source_query=form["source_query"], merge_strategy=merge_strategy, merge_key=(form.get("merge_key") or "").strip() or None, - dest_description=(form.get("dest_description") or "").strip() or None, + dest_description=new_description, enabled=1 if form.get("enabled") == "1" else 0, ) return RedirectResponse(url=f"/modules/{module_id}", status_code=303) @@ -414,6 +433,9 @@ async def wizard_create(request: Request): create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen) except NotImplementedError as e: raise HTTPException(400, str(e)) + effective_staging = staging_table or f"pipekit_staging.{module_name}" + staging_schema, _, _ = effective_staging.partition(".") + try: jrunner.run_dest_sql( dest_conn, @@ -424,6 +446,19 @@ async def wizard_create(request: Request): dest_description, chosen) if comment_sql: jrunner.run_dest_sql(dest_conn, comment_sql) + # Pre-align staging to dest so first run doesn't surprise us. + if staging_schema and staging_schema != effective_staging: + jrunner.run_dest_sql( + dest_conn, + f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};", + ) + jrunner.run_dest_sql( + dest_conn, f"DROP TABLE IF EXISTS {effective_staging};") + jrunner.run_dest_sql( + dest_conn, + f"CREATE TABLE {effective_staging} " + f"(LIKE {qualified_dest} INCLUDING ALL);", + ) except jrunner.JrunnerError as e: raise HTTPException(500, f"dest provisioning failed: {e}") diff --git a/pipekit/web/templates/module_form.html b/pipekit/web/templates/module_form.html index c67dec9..89c517a 100644 --- a/pipekit/web/templates/module_form.html +++ b/pipekit/web/templates/module_form.html @@ -64,7 +64,7 @@ @@ -95,7 +95,7 @@