Add module edit page + detect jrunner silent failures.
Modules get a full edit form (name, connections, tables, source query, merge config, description, enabled); reachable via Edit button on the detail page and the source-query panel. jrunner catches SQLException and calls System.exit(0) at every failure site, so pipekit was marking runs success when the migrate phase had actually errored. query() and migrate() now scan stdout+stderr for a Java stack-trace signature and raise JrunnerError. runner.py also captures the failed jrunner output onto run_log so the stack trace is visible on the run detail page. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
d952b48a4e
commit
2ef68d766c
@ -128,6 +128,9 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
|
|||||||
|
|
||||||
except Exception as e: # noqa: BLE001
|
except Exception as e: # noqa: BLE001
|
||||||
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
|
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
|
||||||
|
if isinstance(e, jrunner.JrunnerError):
|
||||||
|
repo.log_run_output(run_id, jrunner_stdout=e.stdout,
|
||||||
|
jrunner_stderr=e.stderr)
|
||||||
# Failure-path hooks, if any. Never let these mask the real error.
|
# Failure-path hooks, if any. Never let these mask the real error.
|
||||||
try:
|
try:
|
||||||
hook_log = _run_hooks(module_id, fail_fast=False,
|
hook_log = _run_hooks(module_id, fail_fast=False,
|
||||||
|
|||||||
@ -125,6 +125,9 @@ def query(
|
|||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
||||||
stdout=r.stdout, stderr=r.stderr)
|
stdout=r.stdout, stderr=r.stderr)
|
||||||
|
silent = _detect_silent_failure(r.stdout, r.stderr)
|
||||||
|
if silent:
|
||||||
|
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
|
||||||
|
|
||||||
reader = csv.reader(io.StringIO(r.stdout))
|
reader = csv.reader(io.StringIO(r.stdout))
|
||||||
header = next(reader, [])
|
header = next(reader, [])
|
||||||
@ -169,6 +172,9 @@ def migrate(
|
|||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
||||||
stdout=r.stdout, stderr=r.stderr)
|
stdout=r.stdout, stderr=r.stderr)
|
||||||
|
silent = _detect_silent_failure(r.stdout, r.stderr)
|
||||||
|
if silent:
|
||||||
|
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
|
||||||
|
|
||||||
return MigrateResult(
|
return MigrateResult(
|
||||||
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
|
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
|
||||||
@ -202,6 +208,26 @@ def _parse_row_count(text: str) -> int | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# jrunner catches SQLException, prints the stack trace, then exits 0 at
|
||||||
|
# nearly every failure site (see jrunner.java). Detect those by scanning
|
||||||
|
# for a Java stack-trace signature so callers don't treat silent failures
|
||||||
|
# as success.
|
||||||
|
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
|
||||||
|
_EXCEPTION_HEADER_RE = re.compile(
|
||||||
|
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
|
||||||
|
"""Return a short error summary if jrunner exited 0 but logged a failure."""
|
||||||
|
combined = (stderr or "") + "\n" + (stdout or "")
|
||||||
|
if not _STACK_FRAME_RE.search(combined):
|
||||||
|
return None
|
||||||
|
m = _EXCEPTION_HEADER_RE.search(combined)
|
||||||
|
if m:
|
||||||
|
return m.group(0).strip()
|
||||||
|
return "jrunner logged a Java stack trace but exited 0"
|
||||||
|
|
||||||
|
|
||||||
class JrunnerError(RuntimeError):
|
class JrunnerError(RuntimeError):
|
||||||
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
|
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|||||||
@ -177,6 +177,40 @@ def list_modules() -> list[dict]:
|
|||||||
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
|
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
|
||||||
|
|
||||||
|
|
||||||
|
def update_module(module_id: int, *, name: str | None = None,
|
||||||
|
source_connection_id: int | None = None,
|
||||||
|
dest_connection_id: int | None = None,
|
||||||
|
dest_table: str | None = None,
|
||||||
|
staging_table: str | None = None,
|
||||||
|
source_query: str | None = None,
|
||||||
|
merge_strategy: str | None = None,
|
||||||
|
merge_key: str | None = None,
|
||||||
|
dest_description: str | None = None,
|
||||||
|
enabled: int | None = None) -> dict | None:
|
||||||
|
fields: list[str] = []
|
||||||
|
values: list = []
|
||||||
|
for col, val in (("name", name),
|
||||||
|
("source_connection_id", source_connection_id),
|
||||||
|
("dest_connection_id", dest_connection_id),
|
||||||
|
("dest_table", dest_table),
|
||||||
|
("staging_table", staging_table),
|
||||||
|
("source_query", source_query),
|
||||||
|
("merge_strategy", merge_strategy),
|
||||||
|
("merge_key", merge_key),
|
||||||
|
("dest_description", dest_description),
|
||||||
|
("enabled", enabled)):
|
||||||
|
if val is not None:
|
||||||
|
fields.append(f"{col}=?")
|
||||||
|
values.append(val)
|
||||||
|
if not fields:
|
||||||
|
return get_module(module_id)
|
||||||
|
fields.append("updated_at=datetime('now')")
|
||||||
|
values.append(module_id)
|
||||||
|
with db.connect() as c:
|
||||||
|
c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values)
|
||||||
|
return get_module(module_id)
|
||||||
|
|
||||||
|
|
||||||
class ModuleRunning(RuntimeError):
|
class ModuleRunning(RuntimeError):
|
||||||
"""Raised by delete_module when the module is currently running."""
|
"""Raised by delete_module when the module is currently running."""
|
||||||
|
|
||||||
|
|||||||
@ -131,6 +131,53 @@ def module_detail(request: Request, module_id: int):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse)
|
||||||
|
def module_edit(request: Request, module_id: int):
|
||||||
|
module = repo.get_module(module_id)
|
||||||
|
if module is None:
|
||||||
|
raise HTTPException(404, f"module id={module_id} not found")
|
||||||
|
return _templates.TemplateResponse(
|
||||||
|
request,
|
||||||
|
"module_form.html",
|
||||||
|
_ctx(module=module, connections=repo.list_connections(),
|
||||||
|
form_action=f"/modules/{module_id}",
|
||||||
|
cancel_url=f"/modules/{module_id}"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@_router.post("/modules/{module_id}")
|
||||||
|
async def module_update(request: Request, module_id: int):
|
||||||
|
module = repo.get_module(module_id)
|
||||||
|
if module is None:
|
||||||
|
raise HTTPException(404, f"module id={module_id} not found")
|
||||||
|
form = await request.form()
|
||||||
|
new_name = form["name"].strip()
|
||||||
|
if new_name != module["name"]:
|
||||||
|
existing = repo.get_module_by_name(new_name)
|
||||||
|
if existing is not None:
|
||||||
|
raise HTTPException(
|
||||||
|
409, f"module name {new_name!r} already exists — pick another")
|
||||||
|
|
||||||
|
merge_strategy = form.get("merge_strategy", "full")
|
||||||
|
if merge_strategy not in ("full", "incremental", "append"):
|
||||||
|
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
|
||||||
|
|
||||||
|
repo.update_module(
|
||||||
|
module_id,
|
||||||
|
name=new_name,
|
||||||
|
source_connection_id=int(form["source_connection_id"]),
|
||||||
|
dest_connection_id=int(form["dest_connection_id"]),
|
||||||
|
dest_table=form["dest_table"].strip(),
|
||||||
|
staging_table=form["staging_table"].strip(),
|
||||||
|
source_query=form["source_query"],
|
||||||
|
merge_strategy=merge_strategy,
|
||||||
|
merge_key=(form.get("merge_key") or "").strip() or None,
|
||||||
|
dest_description=(form.get("dest_description") or "").strip() or None,
|
||||||
|
enabled=1 if form.get("enabled") == "1" else 0,
|
||||||
|
)
|
||||||
|
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
@_router.post("/modules/{module_id}/delete")
|
@_router.post("/modules/{module_id}/delete")
|
||||||
def module_delete(module_id: int):
|
def module_delete(module_id: int):
|
||||||
if repo.get_module(module_id) is None:
|
if repo.get_module(module_id) is None:
|
||||||
|
|||||||
@ -19,6 +19,7 @@
|
|||||||
<input type="hidden" name="dry_run" value="1">
|
<input type="hidden" name="dry_run" value="1">
|
||||||
<button type="submit">Dry run</button>
|
<button type="submit">Dry run</button>
|
||||||
</form>
|
</form>
|
||||||
|
<a class="btn" href="/modules/{{ module.id }}/edit">Edit</a>
|
||||||
<form class="inline" method="post" action="/modules/{{ module.id }}/delete"
|
<form class="inline" method="post" action="/modules/{{ module.id }}/delete"
|
||||||
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
|
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
|
||||||
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
|
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
|
||||||
@ -41,7 +42,8 @@
|
|||||||
<div>
|
<div>
|
||||||
<div class="panel">
|
<div class="panel">
|
||||||
<header>Source query
|
<header>Source query
|
||||||
<span class="subtitle">free text — edit opens in $EDITOR (TODO)</span>
|
<span class="subtitle">free text with <code>{watermark}</code> placeholders</span>
|
||||||
|
<span style="margin-left:auto"><a href="/modules/{{ module.id }}/edit">edit</a></span>
|
||||||
</header>
|
</header>
|
||||||
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
|
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
115
pipekit/web/templates/module_form.html
Normal file
115
pipekit/web/templates/module_form.html
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
{% set section = "modules" %}
|
||||||
|
{% block title %}Edit module · {{ module.name }} — Pipekit{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<div class="panel">
|
||||||
|
<header>
|
||||||
|
Edit module · {{ module.name }}
|
||||||
|
<span class="subtitle">
|
||||||
|
module #{{ module.id }}
|
||||||
|
{% if module.running %}<span class="pill running">running</span>{% endif %}
|
||||||
|
</span>
|
||||||
|
<span style="margin-left:auto"><a href="{{ cancel_url }}">← back to module</a></span>
|
||||||
|
</header>
|
||||||
|
<div class="body">
|
||||||
|
{% if module.running %}
|
||||||
|
<div class="flash warning" style="margin-bottom:0.8rem">
|
||||||
|
This module is currently running. Saving changes now is allowed but may affect the in-flight run.
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<form method="post" action="{{ form_action }}">
|
||||||
|
<label class="field">
|
||||||
|
<span>name</span>
|
||||||
|
<input type="text" name="name" required value="{{ module.name }}">
|
||||||
|
<span class="help">must be unique; also used as the default staging table suffix</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<div class="two-col" style="gap:1rem">
|
||||||
|
<label class="field">
|
||||||
|
<span>source connection</span>
|
||||||
|
<select name="source_connection_id" required>
|
||||||
|
{% for c in connections %}
|
||||||
|
<option value="{{ c.id }}"
|
||||||
|
{% if c.id == module.source_connection_id %}selected{% endif %}>
|
||||||
|
{{ c.name }}
|
||||||
|
</option>
|
||||||
|
{% endfor %}
|
||||||
|
</select>
|
||||||
|
<span class="help">where <code>source_query</code> runs</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<label class="field">
|
||||||
|
<span>dest connection</span>
|
||||||
|
<select name="dest_connection_id" required>
|
||||||
|
{% for c in connections %}
|
||||||
|
<option value="{{ c.id }}"
|
||||||
|
{% if c.id == module.dest_connection_id %}selected{% endif %}>
|
||||||
|
{{ c.name }}
|
||||||
|
</option>
|
||||||
|
{% endfor %}
|
||||||
|
</select>
|
||||||
|
<span class="help">where staging + merge run</span>
|
||||||
|
</label>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="two-col" style="gap:1rem">
|
||||||
|
<label class="field">
|
||||||
|
<span>dest table</span>
|
||||||
|
<input type="text" name="dest_table" required value="{{ module.dest_table }}">
|
||||||
|
<span class="help"><code>schema.table</code> — editing here does NOT rename the actual table</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<label class="field">
|
||||||
|
<span>staging table</span>
|
||||||
|
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
|
||||||
|
<span class="help">dropped + recreated each run</span>
|
||||||
|
</label>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<label class="field">
|
||||||
|
<span>source query</span>
|
||||||
|
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
|
||||||
|
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<div class="two-col" style="gap:1rem">
|
||||||
|
<label class="field">
|
||||||
|
<span>merge strategy</span>
|
||||||
|
<select name="merge_strategy" required>
|
||||||
|
{% for s in ("full", "incremental", "append") %}
|
||||||
|
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
|
||||||
|
{% endfor %}
|
||||||
|
</select>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<label class="field">
|
||||||
|
<span>merge key</span>
|
||||||
|
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
|
||||||
|
placeholder="id or (col_a, col_b)">
|
||||||
|
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
|
||||||
|
</label>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<label class="field">
|
||||||
|
<span>dest description</span>
|
||||||
|
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
|
||||||
|
<span class="help">COMMENT ON TABLE value; editing here does NOT re-apply it to the dest table</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
|
||||||
|
<input type="checkbox" name="enabled" value="1"
|
||||||
|
{% if module.enabled %}checked{% endif %}>
|
||||||
|
<span>enabled</span>
|
||||||
|
<span class="help">disabled modules are skipped by group runs; ad-hoc runs still work</span>
|
||||||
|
</label>
|
||||||
|
|
||||||
|
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
|
||||||
|
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
|
||||||
|
<button type="submit" class="primary">save changes</button>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
Loading…
Reference in New Issue
Block a user