diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py
index e957772..b21562c 100644
--- a/pipekit/engine/runner.py
+++ b/pipekit/engine/runner.py
@@ -91,16 +91,19 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
status = "success"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
- # 4. ensure staging table exists on dest. Mirror the real dest schema
- # so jrunner's auto-DELETE and the subsequent merge INSERT both find
- # a table to work on. Idempotent — no-op after first run.
+ # 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so
+ # any drift — dest columns added since staging was last made — is
+ # self-healing. Staging is ephemeral per SPEC; nothing of value lives
+ # between runs.
staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]:
jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
+ jrunner.run_dest_sql(
+ dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
jrunner.run_dest_sql(
dest_conn,
- f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
+ f"CREATE TABLE {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);",
)
diff --git a/pipekit/jrunner.py b/pipekit/jrunner.py
index 2fc9d3e..4bcdb89 100644
--- a/pipekit/jrunner.py
+++ b/pipekit/jrunner.py
@@ -216,6 +216,14 @@ _STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
_EXCEPTION_HEADER_RE = re.compile(
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
+# jrunner runs query-mode SQL with `executeQuery`, which requires the
+# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
+# still executes, but PG then throws "No results were returned by the
+# query." The statement succeeded — ignore the trace.
+_BENIGN_EXCEPTION_SUBSTRINGS = (
+ "No results were returned by the query",
+)
+
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
"""Return a short error summary if jrunner exited 0 but logged a failure."""
@@ -223,9 +231,10 @@ def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
if not _STACK_FRAME_RE.search(combined):
return None
m = _EXCEPTION_HEADER_RE.search(combined)
- if m:
- return m.group(0).strip()
- return "jrunner logged a Java stack trace but exited 0"
+ header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
+ if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
+ return None
+ return header
class JrunnerError(RuntimeError):
diff --git a/pipekit/web/app.py b/pipekit/web/app.py
index e789284..e6a4905 100644
--- a/pipekit/web/app.py
+++ b/pipekit/web/app.py
@@ -162,17 +162,36 @@ async def module_update(request: Request, module_id: int):
if merge_strategy not in ("full", "incremental", "append"):
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
+ new_staging = form["staging_table"].strip()
+ new_description = (form.get("dest_description") or "").strip() or None
+
+ # DDL snap: drop the staging table under its old name (runner recreates
+ # fresh from dest on next run). Re-apply dest COMMENT if it changed.
+ dest_conn = repo.get_connection(module["dest_connection_id"])
+ if dest_conn is not None:
+ try:
+ jrunner.run_dest_sql(
+ dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
+ if new_description != module["dest_description"]:
+ jrunner.run_dest_sql(
+ dest_conn,
+ f"COMMENT ON TABLE {module['dest_table']} IS "
+ f"{_sql_str(new_description or '')};",
+ )
+ except jrunner.JrunnerError as e:
+ raise HTTPException(500, f"dest DDL snap failed: {e}")
+
repo.update_module(
module_id,
name=new_name,
source_connection_id=int(form["source_connection_id"]),
dest_connection_id=int(form["dest_connection_id"]),
dest_table=form["dest_table"].strip(),
- staging_table=form["staging_table"].strip(),
+ staging_table=new_staging,
source_query=form["source_query"],
merge_strategy=merge_strategy,
merge_key=(form.get("merge_key") or "").strip() or None,
- dest_description=(form.get("dest_description") or "").strip() or None,
+ dest_description=new_description,
enabled=1 if form.get("enabled") == "1" else 0,
)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@@ -414,6 +433,9 @@ async def wizard_create(request: Request):
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
except NotImplementedError as e:
raise HTTPException(400, str(e))
+ effective_staging = staging_table or f"pipekit_staging.{module_name}"
+ staging_schema, _, _ = effective_staging.partition(".")
+
try:
jrunner.run_dest_sql(
dest_conn,
@@ -424,6 +446,19 @@ async def wizard_create(request: Request):
dest_description, chosen)
if comment_sql:
jrunner.run_dest_sql(dest_conn, comment_sql)
+ # Pre-align staging to dest so first run doesn't surprise us.
+ if staging_schema and staging_schema != effective_staging:
+ jrunner.run_dest_sql(
+ dest_conn,
+ f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
+ )
+ jrunner.run_dest_sql(
+ dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
+ jrunner.run_dest_sql(
+ dest_conn,
+ f"CREATE TABLE {effective_staging} "
+ f"(LIKE {qualified_dest} INCLUDING ALL);",
+ )
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest provisioning failed: {e}")
diff --git a/pipekit/web/templates/module_form.html b/pipekit/web/templates/module_form.html
index c67dec9..89c517a 100644
--- a/pipekit/web/templates/module_form.html
+++ b/pipekit/web/templates/module_form.html
@@ -64,7 +64,7 @@
@@ -95,7 +95,7 @@