Snap staging DDL on module create/edit/run; allowlist benign jrunner exception.
Staging table drift caused silent data loss when dest grew columns but staging kept the old shape. Fix on three fronts: - Runner now DROP+CREATEs staging each run instead of CREATE IF NOT EXISTS, so any drift self-heals. - Wizard create drop+creates staging right after dest is provisioned, surfacing DDL errors at create time. - Module edit drops the (old-name) staging table and re-applies COMMENT ON TABLE when dest_description changed. jrunner's query mode uses executeQuery() which raises "No results were returned by the query" after DDL/DML succeeds; the stack-trace detector now allowlists that exception so normal CREATE/TRUNCATE/INSERT runs aren't flagged as failures. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
2ef68d766c
commit
01bcba78b4
@ -91,16 +91,19 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
|
||||
status = "success"
|
||||
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
|
||||
|
||||
# 4. ensure staging table exists on dest. Mirror the real dest schema
|
||||
# so jrunner's auto-DELETE and the subsequent merge INSERT both find
|
||||
# a table to work on. Idempotent — no-op after first run.
|
||||
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so
|
||||
# any drift — dest columns added since staging was last made — is
|
||||
# self-healing. Staging is ephemeral per SPEC; nothing of value lives
|
||||
# between runs.
|
||||
staging_schema, _, _ = module["staging_table"].partition(".")
|
||||
if staging_schema and staging_schema != module["staging_table"]:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
|
||||
f"CREATE TABLE {module['staging_table']} "
|
||||
f"(LIKE {module['dest_table']} INCLUDING ALL);",
|
||||
)
|
||||
|
||||
|
||||
@ -216,6 +216,14 @@ _STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
|
||||
_EXCEPTION_HEADER_RE = re.compile(
|
||||
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
|
||||
|
||||
# jrunner runs query-mode SQL with `executeQuery`, which requires the
|
||||
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
|
||||
# still executes, but PG then throws "No results were returned by the
|
||||
# query." The statement succeeded — ignore the trace.
|
||||
_BENIGN_EXCEPTION_SUBSTRINGS = (
|
||||
"No results were returned by the query",
|
||||
)
|
||||
|
||||
|
||||
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
|
||||
"""Return a short error summary if jrunner exited 0 but logged a failure."""
|
||||
@ -223,9 +231,10 @@ def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
|
||||
if not _STACK_FRAME_RE.search(combined):
|
||||
return None
|
||||
m = _EXCEPTION_HEADER_RE.search(combined)
|
||||
if m:
|
||||
return m.group(0).strip()
|
||||
return "jrunner logged a Java stack trace but exited 0"
|
||||
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
|
||||
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
|
||||
return None
|
||||
return header
|
||||
|
||||
|
||||
class JrunnerError(RuntimeError):
|
||||
|
||||
@ -162,17 +162,36 @@ async def module_update(request: Request, module_id: int):
|
||||
if merge_strategy not in ("full", "incremental", "append"):
|
||||
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
|
||||
|
||||
new_staging = form["staging_table"].strip()
|
||||
new_description = (form.get("dest_description") or "").strip() or None
|
||||
|
||||
# DDL snap: drop the staging table under its old name (runner recreates
|
||||
# fresh from dest on next run). Re-apply dest COMMENT if it changed.
|
||||
dest_conn = repo.get_connection(module["dest_connection_id"])
|
||||
if dest_conn is not None:
|
||||
try:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
|
||||
if new_description != module["dest_description"]:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"COMMENT ON TABLE {module['dest_table']} IS "
|
||||
f"{_sql_str(new_description or '')};",
|
||||
)
|
||||
except jrunner.JrunnerError as e:
|
||||
raise HTTPException(500, f"dest DDL snap failed: {e}")
|
||||
|
||||
repo.update_module(
|
||||
module_id,
|
||||
name=new_name,
|
||||
source_connection_id=int(form["source_connection_id"]),
|
||||
dest_connection_id=int(form["dest_connection_id"]),
|
||||
dest_table=form["dest_table"].strip(),
|
||||
staging_table=form["staging_table"].strip(),
|
||||
staging_table=new_staging,
|
||||
source_query=form["source_query"],
|
||||
merge_strategy=merge_strategy,
|
||||
merge_key=(form.get("merge_key") or "").strip() or None,
|
||||
dest_description=(form.get("dest_description") or "").strip() or None,
|
||||
dest_description=new_description,
|
||||
enabled=1 if form.get("enabled") == "1" else 0,
|
||||
)
|
||||
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
|
||||
@ -414,6 +433,9 @@ async def wizard_create(request: Request):
|
||||
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
|
||||
except NotImplementedError as e:
|
||||
raise HTTPException(400, str(e))
|
||||
effective_staging = staging_table or f"pipekit_staging.{module_name}"
|
||||
staging_schema, _, _ = effective_staging.partition(".")
|
||||
|
||||
try:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
@ -424,6 +446,19 @@ async def wizard_create(request: Request):
|
||||
dest_description, chosen)
|
||||
if comment_sql:
|
||||
jrunner.run_dest_sql(dest_conn, comment_sql)
|
||||
# Pre-align staging to dest so first run doesn't surprise us.
|
||||
if staging_schema and staging_schema != effective_staging:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
|
||||
)
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE TABLE {effective_staging} "
|
||||
f"(LIKE {qualified_dest} INCLUDING ALL);",
|
||||
)
|
||||
except jrunner.JrunnerError as e:
|
||||
raise HTTPException(500, f"dest provisioning failed: {e}")
|
||||
|
||||
|
||||
@ -64,7 +64,7 @@
|
||||
<label class="field">
|
||||
<span>staging table</span>
|
||||
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
|
||||
<span class="help">dropped + recreated each run</span>
|
||||
<span class="help">dropped on save; recreated from dest on next run</span>
|
||||
</label>
|
||||
</div>
|
||||
|
||||
@ -95,7 +95,7 @@
|
||||
<label class="field">
|
||||
<span>dest description</span>
|
||||
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
|
||||
<span class="help">COMMENT ON TABLE value; editing here does NOT re-apply it to the dest table</span>
|
||||
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
|
||||
</label>
|
||||
|
||||
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
|
||||
|
||||
Loading…
Reference in New Issue
Block a user