From 2ef68d766cb6881411c7e40c5da5ca3f948695b8 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Wed, 22 Apr 2026 11:02:45 -0400 Subject: [PATCH] Add module edit page + detect jrunner silent failures. Modules get a full edit form (name, connections, tables, source query, merge config, description, enabled); reachable via Edit button on the detail page and the source-query panel. jrunner catches SQLException and calls System.exit(0) at every failure site, so pipekit was marking runs success when the migrate phase had actually errored. query() and migrate() now scan stdout+stderr for a Java stack-trace signature and raise JrunnerError. runner.py also captures the failed jrunner output onto run_log so the stack trace is visible on the run detail page. Co-Authored-By: Claude Opus 4.7 --- pipekit/engine/runner.py | 3 + pipekit/jrunner.py | 26 +++++ pipekit/repo.py | 34 +++++++ pipekit/web/app.py | 47 +++++++++ pipekit/web/templates/module_detail.html | 4 +- pipekit/web/templates/module_form.html | 115 +++++++++++++++++++++++ 6 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 pipekit/web/templates/module_form.html diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index a226709..e957772 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -128,6 +128,9 @@ def run_module(module_id: int, *, group_run_id: int | None = None, except Exception as e: # noqa: BLE001 error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" + if isinstance(e, jrunner.JrunnerError): + repo.log_run_output(run_id, jrunner_stdout=e.stdout, + jrunner_stderr=e.stderr) # Failure-path hooks, if any. Never let these mask the real error. try: hook_log = _run_hooks(module_id, fail_fast=False, diff --git a/pipekit/jrunner.py b/pipekit/jrunner.py index adfa9b1..2fc9d3e 100644 --- a/pipekit/jrunner.py +++ b/pipekit/jrunner.py @@ -125,6 +125,9 @@ def query( if r.returncode != 0: raise JrunnerError(r.stderr.strip() or r.stdout.strip(), stdout=r.stdout, stderr=r.stderr) + silent = _detect_silent_failure(r.stdout, r.stderr) + if silent: + raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) reader = csv.reader(io.StringIO(r.stdout)) header = next(reader, []) @@ -169,6 +172,9 @@ def migrate( if r.returncode != 0: raise JrunnerError(r.stderr.strip() or r.stdout.strip(), stdout=r.stdout, stderr=r.stderr) + silent = _detect_silent_failure(r.stdout, r.stderr) + if silent: + raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) return MigrateResult( row_count=_parse_row_count(r.stdout + "\n" + r.stderr), @@ -202,6 +208,26 @@ def _parse_row_count(text: str) -> int | None: return None +# jrunner catches SQLException, prints the stack trace, then exits 0 at +# nearly every failure site (see jrunner.java). Detect those by scanning +# for a Java stack-trace signature so callers don't treat silent failures +# as success. +_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M) +_EXCEPTION_HEADER_RE = re.compile( + r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M) + + +def _detect_silent_failure(stdout: str, stderr: str) -> str | None: + """Return a short error summary if jrunner exited 0 but logged a failure.""" + combined = (stderr or "") + "\n" + (stdout or "") + if not _STACK_FRAME_RE.search(combined): + return None + m = _EXCEPTION_HEADER_RE.search(combined) + if m: + return m.group(0).strip() + return "jrunner logged a Java stack trace but exited 0" + + class JrunnerError(RuntimeError): def __init__(self, message: str, *, stdout: str = "", stderr: str = ""): super().__init__(message) diff --git a/pipekit/repo.py b/pipekit/repo.py index 138f37b..72a7c51 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -177,6 +177,40 @@ def list_modules() -> list[dict]: return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")] +def update_module(module_id: int, *, name: str | None = None, + source_connection_id: int | None = None, + dest_connection_id: int | None = None, + dest_table: str | None = None, + staging_table: str | None = None, + source_query: str | None = None, + merge_strategy: str | None = None, + merge_key: str | None = None, + dest_description: str | None = None, + enabled: int | None = None) -> dict | None: + fields: list[str] = [] + values: list = [] + for col, val in (("name", name), + ("source_connection_id", source_connection_id), + ("dest_connection_id", dest_connection_id), + ("dest_table", dest_table), + ("staging_table", staging_table), + ("source_query", source_query), + ("merge_strategy", merge_strategy), + ("merge_key", merge_key), + ("dest_description", dest_description), + ("enabled", enabled)): + if val is not None: + fields.append(f"{col}=?") + values.append(val) + if not fields: + return get_module(module_id) + fields.append("updated_at=datetime('now')") + values.append(module_id) + with db.connect() as c: + c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values) + return get_module(module_id) + + class ModuleRunning(RuntimeError): """Raised by delete_module when the module is currently running.""" diff --git a/pipekit/web/app.py b/pipekit/web/app.py index f252dfb..e789284 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -131,6 +131,53 @@ def module_detail(request: Request, module_id: int): ) +@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse) +def module_edit(request: Request, module_id: int): + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + return _templates.TemplateResponse( + request, + "module_form.html", + _ctx(module=module, connections=repo.list_connections(), + form_action=f"/modules/{module_id}", + cancel_url=f"/modules/{module_id}"), + ) + + +@_router.post("/modules/{module_id}") +async def module_update(request: Request, module_id: int): + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + form = await request.form() + new_name = form["name"].strip() + if new_name != module["name"]: + existing = repo.get_module_by_name(new_name) + if existing is not None: + raise HTTPException( + 409, f"module name {new_name!r} already exists — pick another") + + merge_strategy = form.get("merge_strategy", "full") + if merge_strategy not in ("full", "incremental", "append"): + raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}") + + repo.update_module( + module_id, + name=new_name, + source_connection_id=int(form["source_connection_id"]), + dest_connection_id=int(form["dest_connection_id"]), + dest_table=form["dest_table"].strip(), + staging_table=form["staging_table"].strip(), + source_query=form["source_query"], + merge_strategy=merge_strategy, + merge_key=(form.get("merge_key") or "").strip() or None, + dest_description=(form.get("dest_description") or "").strip() or None, + enabled=1 if form.get("enabled") == "1" else 0, + ) + return RedirectResponse(url=f"/modules/{module_id}", status_code=303) + + @_router.post("/modules/{module_id}/delete") def module_delete(module_id: int): if repo.get_module(module_id) is None: diff --git a/pipekit/web/templates/module_detail.html b/pipekit/web/templates/module_detail.html index b039af8..29021b0 100644 --- a/pipekit/web/templates/module_detail.html +++ b/pipekit/web/templates/module_detail.html @@ -19,6 +19,7 @@ + Edit
@@ -41,7 +42,8 @@
Source query - free text — edit opens in $EDITOR (TODO) + free text with {watermark} placeholders + edit
{{ module.source_query }}
diff --git a/pipekit/web/templates/module_form.html b/pipekit/web/templates/module_form.html new file mode 100644 index 0000000..c67dec9 --- /dev/null +++ b/pipekit/web/templates/module_form.html @@ -0,0 +1,115 @@ +{% extends "base.html" %} +{% set section = "modules" %} +{% block title %}Edit module · {{ module.name }} — Pipekit{% endblock %} + +{% block content %} +
+
+ Edit module · {{ module.name }} + + module #{{ module.id }} + {% if module.running %}running{% endif %} + + ← back to module +
+
+ {% if module.running %} +
+ This module is currently running. Saving changes now is allowed but may affect the in-flight run. +
+ {% endif %} + + + + +
+ + + +
+ +
+ + + +
+ + + +
+ + + +
+ + + + + +
+ cancel + +
+ +
+
+{% endblock %}