diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index a226709..e957772 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -128,6 +128,9 @@ def run_module(module_id: int, *, group_run_id: int | None = None, except Exception as e: # noqa: BLE001 error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" + if isinstance(e, jrunner.JrunnerError): + repo.log_run_output(run_id, jrunner_stdout=e.stdout, + jrunner_stderr=e.stderr) # Failure-path hooks, if any. Never let these mask the real error. try: hook_log = _run_hooks(module_id, fail_fast=False, diff --git a/pipekit/jrunner.py b/pipekit/jrunner.py index adfa9b1..2fc9d3e 100644 --- a/pipekit/jrunner.py +++ b/pipekit/jrunner.py @@ -125,6 +125,9 @@ def query( if r.returncode != 0: raise JrunnerError(r.stderr.strip() or r.stdout.strip(), stdout=r.stdout, stderr=r.stderr) + silent = _detect_silent_failure(r.stdout, r.stderr) + if silent: + raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) reader = csv.reader(io.StringIO(r.stdout)) header = next(reader, []) @@ -169,6 +172,9 @@ def migrate( if r.returncode != 0: raise JrunnerError(r.stderr.strip() or r.stdout.strip(), stdout=r.stdout, stderr=r.stderr) + silent = _detect_silent_failure(r.stdout, r.stderr) + if silent: + raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) return MigrateResult( row_count=_parse_row_count(r.stdout + "\n" + r.stderr), @@ -202,6 +208,26 @@ def _parse_row_count(text: str) -> int | None: return None +# jrunner catches SQLException, prints the stack trace, then exits 0 at +# nearly every failure site (see jrunner.java). Detect those by scanning +# for a Java stack-trace signature so callers don't treat silent failures +# as success. +_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M) +_EXCEPTION_HEADER_RE = re.compile( + r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M) + + +def _detect_silent_failure(stdout: str, stderr: str) -> str | None: + """Return a short error summary if jrunner exited 0 but logged a failure.""" + combined = (stderr or "") + "\n" + (stdout or "") + if not _STACK_FRAME_RE.search(combined): + return None + m = _EXCEPTION_HEADER_RE.search(combined) + if m: + return m.group(0).strip() + return "jrunner logged a Java stack trace but exited 0" + + class JrunnerError(RuntimeError): def __init__(self, message: str, *, stdout: str = "", stderr: str = ""): super().__init__(message) diff --git a/pipekit/repo.py b/pipekit/repo.py index 138f37b..72a7c51 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -177,6 +177,40 @@ def list_modules() -> list[dict]: return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")] +def update_module(module_id: int, *, name: str | None = None, + source_connection_id: int | None = None, + dest_connection_id: int | None = None, + dest_table: str | None = None, + staging_table: str | None = None, + source_query: str | None = None, + merge_strategy: str | None = None, + merge_key: str | None = None, + dest_description: str | None = None, + enabled: int | None = None) -> dict | None: + fields: list[str] = [] + values: list = [] + for col, val in (("name", name), + ("source_connection_id", source_connection_id), + ("dest_connection_id", dest_connection_id), + ("dest_table", dest_table), + ("staging_table", staging_table), + ("source_query", source_query), + ("merge_strategy", merge_strategy), + ("merge_key", merge_key), + ("dest_description", dest_description), + ("enabled", enabled)): + if val is not None: + fields.append(f"{col}=?") + values.append(val) + if not fields: + return get_module(module_id) + fields.append("updated_at=datetime('now')") + values.append(module_id) + with db.connect() as c: + c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values) + return get_module(module_id) + + class ModuleRunning(RuntimeError): """Raised by delete_module when the module is currently running.""" diff --git a/pipekit/web/app.py b/pipekit/web/app.py index f252dfb..e789284 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -131,6 +131,53 @@ def module_detail(request: Request, module_id: int): ) +@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse) +def module_edit(request: Request, module_id: int): + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + return _templates.TemplateResponse( + request, + "module_form.html", + _ctx(module=module, connections=repo.list_connections(), + form_action=f"/modules/{module_id}", + cancel_url=f"/modules/{module_id}"), + ) + + +@_router.post("/modules/{module_id}") +async def module_update(request: Request, module_id: int): + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + form = await request.form() + new_name = form["name"].strip() + if new_name != module["name"]: + existing = repo.get_module_by_name(new_name) + if existing is not None: + raise HTTPException( + 409, f"module name {new_name!r} already exists — pick another") + + merge_strategy = form.get("merge_strategy", "full") + if merge_strategy not in ("full", "incremental", "append"): + raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}") + + repo.update_module( + module_id, + name=new_name, + source_connection_id=int(form["source_connection_id"]), + dest_connection_id=int(form["dest_connection_id"]), + dest_table=form["dest_table"].strip(), + staging_table=form["staging_table"].strip(), + source_query=form["source_query"], + merge_strategy=merge_strategy, + merge_key=(form.get("merge_key") or "").strip() or None, + dest_description=(form.get("dest_description") or "").strip() or None, + enabled=1 if form.get("enabled") == "1" else 0, + ) + return RedirectResponse(url=f"/modules/{module_id}", status_code=303) + + @_router.post("/modules/{module_id}/delete") def module_delete(module_id: int): if repo.get_module(module_id) is None: diff --git a/pipekit/web/templates/module_detail.html b/pipekit/web/templates/module_detail.html index b039af8..29021b0 100644 --- a/pipekit/web/templates/module_detail.html +++ b/pipekit/web/templates/module_detail.html @@ -19,6 +19,7 @@ + Edit