Add live progress to module runs: async web POST + HTMX polling

Web POST /modules/{id}/run now returns immediately (BackgroundTasks)
instead of blocking until the run completes. jrunner.migrate() switches
from subprocess.run to Popen so stdout lines are read as they arrive and
appended to run_log.live_log via repo.append_run_live_log(). The run
detail page embeds an HTMX fragment that polls /runs/{id}/live every 2s
while status=running, showing current status, row count, and live output;
polling stops automatically once the run finishes.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-05-08 14:02:36 -04:00
parent 760d4e7fec
commit 99f75490c4
11 changed files with 160 additions and 30 deletions

View File

@ -43,7 +43,7 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta
- **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock) - **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
- **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query - **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query
- **hook** — post-merge SQL (run_order, run_on: success/failure/always) - **hook** — post-merge SQL (run_order, run_on: success/failure/always)
- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing - **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
## Engine Flow ## Engine Flow
@ -55,7 +55,7 @@ run_module(module_id)
→ build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert) → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
→ if dry_run: write run_log (status='dry_run'), return early — no data movement → if dry_run: write run_log (status='dry_run'), return early — no data movement
→ DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift → DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift
→ jrunner migrate source → staging → jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log
→ run merge SQL via jrunner → run merge SQL via jrunner
→ run hooks in order → run hooks in order
→ write run_log entry → write run_log entry
@ -94,7 +94,8 @@ Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF
- `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers - `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
- `/` and other bare paths — full HTML pages (Jinja2), no auth currently - `/` and other bare paths — full HTML pages (Jinja2), no auth currently
- `POST /modules/{id}/run` returns `{run_id}` immediately; run is async; poll `/runs/{id}` for status - `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks
- `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
## Tech Stack ## Tech Stack

30
SPEC.md
View File

@ -236,7 +236,8 @@ run_log(
watermark_values_json, -- {prev_period: "'2610'", ...} watermark_values_json, -- {prev_period: "'2610'", ...}
jrunner_stdout, jrunner_stdout,
jrunner_stderr, jrunner_stderr,
hook_log hook_log,
live_log -- jrunner stdout lines appended in real time during migrate
) )
``` ```
@ -555,30 +556,35 @@ GET /settings
POST /settings/{key} POST /settings/{key}
``` ```
### Async runs + SSE ### Async runs + live progress
`POST /modules/{id}/run` does NOT block. It atomically acquires the `POST /modules/{id}/run` does NOT block. It atomically acquires the
module lock, kicks off the sync in a background task, and returns module lock, kicks off the sync in a background task, and returns
`{"run_id": 4892}` immediately. `{"run_id": 4892}` immediately (both the API and web POST).
Two ways to watch a run after that: Two ways to watch a run after that:
1. **Polling**`GET /runs/{id}` returns the run_log row; keep hitting 1. **Polling**`GET /runs/{id}` returns the run_log row; keep hitting
it until `status != running`. Simple, works anywhere. it until `status != running`. Simple, works anywhere.
2. **Streaming**`GET /runs/{id}/stream` opens a Server-Sent Events 2. **Live fragment polling** — the web run detail page (`/runs/{id}`)
connection. The server pushes event lines as things happen — log embeds an HTMX fragment that polls `GET /runs/{id}/live` every 2 s
lines, row-count updates, final status. The TUI uses this for the while `status == running`. The fragment shows current status,
run watch screen. curl supports it with `-N` (no buffering). row count, and the `live_log` field as it grows. Polling stops
automatically once status leaves `running`.
SSE is plain HTTP with a long-lived connection, not WebSockets. Simpler **Live log** — during `jrunner.migrate()` the engine uses `subprocess.Popen`
to implement, works in browsers natively (`EventSource` in JS), works in (not `subprocess.run`) so stdout can be read line-by-line as jrunner
curl for debugging. emits it. Each non-empty line is appended to `run_log.live_log` via
`repo.append_run_live_log()`. How frequently lines appear depends on
jrunner's output flushing behaviour; at minimum the final row-count
summary line appears when the transfer completes.
Splitting `start` from `watch` (two endpoints) means: Splitting `start` from `watch` means:
- Cron-triggered runs don't have to watch - Cron-triggered runs don't have to watch
- Curl scripting can fire-and-forget - Curl scripting can fire-and-forget
- TUI can reconnect to an already-running sync if it crashes mid-run - A browser navigating directly to `/runs/{id}` mid-run picks up the
live panel automatically
### Auth ### Auth

View File

@ -38,6 +38,10 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
if "dest_description" not in cols: if "dest_description" not in cols:
conn.execute("ALTER TABLE module ADD COLUMN dest_description TEXT") conn.execute("ALTER TABLE module ADD COLUMN dest_description TEXT")
rl_cols = {r[1] for r in conn.execute("PRAGMA table_info(run_log)")}
if "live_log" not in rl_cols:
conn.execute("ALTER TABLE run_log ADD COLUMN live_log TEXT")
@contextmanager @contextmanager
def connect(db_path: Path | None = None): def connect(db_path: Path | None = None):

View File

@ -113,6 +113,7 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
source_conn=source_conn, dest_conn=dest_conn, source_conn=source_conn, dest_conn=dest_conn,
sql=resolved_sql, dest_table=module["staging_table"], sql=resolved_sql, dest_table=module["staging_table"],
clear=False, clear=False,
progress_cb=lambda line: repo.append_run_live_log(run_id, line),
) )
row_count = migrate_result.row_count row_count = migrate_result.row_count
repo.log_run_output(run_id, jrunner_stdout=migrate_result.stdout, repo.log_run_output(run_id, jrunner_stdout=migrate_result.stdout,

View File

@ -14,6 +14,7 @@ argv or in the database.
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable
import csv import csv
import io import io
import os import os
@ -21,6 +22,7 @@ import re
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
import threading
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@ -144,8 +146,13 @@ def migrate(
clear: bool = False, clear: bool = False,
trim: bool = True, trim: bool = True,
timeout: int = 3600, timeout: int = 3600,
progress_cb: "Callable[[str], None] | None" = None,
) -> MigrateResult: ) -> MigrateResult:
"""Stream `sql` results from source into `dest_table` via jrunner migration mode.""" """Stream `sql` results from source into `dest_table` via jrunner migration mode.
If ``progress_cb`` is provided it is called with each non-empty stdout line
as jrunner produces it, enabling live progress reporting.
"""
path = jrunner_path() path = jrunner_path()
with tempfile.NamedTemporaryFile("w", suffix=".sql", delete=False) as f: with tempfile.NamedTemporaryFile("w", suffix=".sql", delete=False) as f:
f.write(sql) f.write(sql)
@ -164,21 +171,50 @@ def migrate(
argv.append("-t") argv.append("-t")
if clear: if clear:
argv.append("-c") argv.append("-c")
r = subprocess.run(argv, capture_output=True, text=True,
timeout=timeout, env=_subprocess_env()) proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, env=_subprocess_env())
stderr_buf: list[str] = []
def _drain_stderr() -> None:
for line in proc.stderr: # type: ignore[union-attr]
stderr_buf.append(line)
t = threading.Thread(target=_drain_stderr, daemon=True)
t.start()
stdout_lines: list[str] = []
for line in proc.stdout: # type: ignore[union-attr]
stdout_lines.append(line)
if progress_cb:
stripped = line.rstrip()
if stripped:
progress_cb(stripped)
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
raise
t.join()
stdout = "".join(stdout_lines)
stderr = "".join(stderr_buf)
finally: finally:
os.unlink(sql_path) os.unlink(sql_path)
if r.returncode != 0: if proc.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(), raise JrunnerError(stderr.strip() or stdout.strip(),
stdout=r.stdout, stderr=r.stderr) stdout=stdout, stderr=stderr)
silent = _detect_silent_failure(r.stdout, r.stderr) silent = _detect_silent_failure(stdout, stderr)
if silent: if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) raise JrunnerError(silent, stdout=stdout, stderr=stderr)
return MigrateResult( return MigrateResult(
row_count=_parse_row_count(r.stdout + "\n" + r.stderr), row_count=_parse_row_count(stdout + "\n" + stderr),
stdout=r.stdout, stderr=r.stderr, stdout=stdout, stderr=stderr,
) )

View File

@ -442,6 +442,14 @@ def log_run_output(run_id: int, *, jrunner_stdout: str | None = None,
c.execute(f"UPDATE run_log SET {', '.join(sets)} WHERE id=?", vals + [run_id]) c.execute(f"UPDATE run_log SET {', '.join(sets)} WHERE id=?", vals + [run_id])
def append_run_live_log(run_id: int, text: str) -> None:
with db.connect() as c:
c.execute(
"UPDATE run_log SET live_log = COALESCE(live_log, '') || ? WHERE id=?",
(text + "\n", run_id),
)
def finish_run(run_id: int, *, status: str, row_count: int | None = None, def finish_run(run_id: int, *, status: str, row_count: int | None = None,
error: str | None = None) -> None: error: str | None = None) -> None:
with db.connect() as c: with db.connect() as c:

View File

@ -108,7 +108,8 @@ CREATE TABLE IF NOT EXISTS run_log (
watermark_values_json TEXT, watermark_values_json TEXT,
jrunner_stdout TEXT, jrunner_stdout TEXT,
jrunner_stderr TEXT, jrunner_stderr TEXT,
hook_log TEXT hook_log TEXT,
live_log TEXT
); );
CREATE INDEX IF NOT EXISTS idx_run_log_module ON run_log(module_id, id DESC); CREATE INDEX IF NOT EXISTS idx_run_log_module ON run_log(module_id, id DESC);

View File

@ -15,7 +15,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from urllib.parse import urlencode from urllib.parse import urlencode
from fastapi import APIRouter, FastAPI, HTTPException, Query, Request from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@ -210,17 +210,22 @@ def module_delete(module_id: int):
@_router.post("/modules/{module_id}/run") @_router.post("/modules/{module_id}/run")
async def module_run_action(module_id: int, request: Request): async def module_run_action(module_id: int, request: Request,
background: BackgroundTasks):
form = await request.form() form = await request.form()
dry = form.get("dry_run") == "1" dry = form.get("dry_run") == "1"
if repo.get_module(module_id) is None: if repo.get_module(module_id) is None:
raise HTTPException(404, f"module id={module_id} not found") raise HTTPException(404, f"module id={module_id} not found")
run_id = repo.create_run(module_id) run_id = repo.create_run(module_id)
background.add_task(_run_in_background, module_id, run_id, dry)
return RedirectResponse(url=f"/runs/{run_id}", status_code=303)
def _run_in_background(module_id: int, run_id: int, dry_run: bool) -> None:
try: try:
engine.run_module(module_id, run_id=run_id, dry_run=dry) engine.run_module(module_id, run_id=run_id, dry_run=dry_run)
except engine.LockBusy as e: except engine.LockBusy as e:
repo.finish_run(run_id, status="error", error=str(e)) repo.finish_run(run_id, status="error", error=str(e))
return RedirectResponse(url=f"/runs/{run_id}", status_code=303)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -716,6 +721,20 @@ def run_detail(request: Request, run_id: int):
) )
@_router.get("/runs/{run_id}/live", response_class=HTMLResponse)
def run_live_fragment(request: Request, run_id: int):
run = repo.get_run(run_id)
if run is None:
raise HTTPException(404, f"run id={run_id} not found")
module = repo.get_module(run["module_id"])
run["module_name"] = module["name"] if module else "?"
return _templates.TemplateResponse(
request,
"_run_live.html",
_ctx(run=run),
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Watermarks — add/edit/delete forms on module detail # Watermarks — add/edit/delete forms on module detail
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@ -0,0 +1,26 @@
<div id="run-live"
{% if run.status == 'running' %}
hx-get="/runs/{{ run.id }}/live"
hx-trigger="every 2s"
hx-swap="outerHTML"
{% endif %}
>
<div class="panel">
<header>
Progress
<span style="margin-left:auto">
<span class="pill {{ run.status }}">{{ run.status }}</span>
{% if run.row_count is not none %}&nbsp;· {{ run.row_count }} rows{% endif %}
</span>
</header>
<div class="body">
{% if run.live_log %}
<pre style="margin:0;white-space:pre-wrap">{{ run.live_log }}</pre>
{% elif run.status == 'running' %}
<span style="color:var(--text-muted)">Waiting for output…</span>
{% else %}
<span style="color:var(--text-muted)"></span>
{% endif %}
</div>
</div>
</div>

View File

@ -5,6 +5,7 @@
<title>{% block title %}Pipekit{% endblock %}</title> <title>{% block title %}Pipekit{% endblock %}</title>
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="/static/style.css"> <link rel="stylesheet" href="/static/style.css">
<script src="https://unpkg.com/htmx.org@1.9.12/dist/htmx.min.js" defer></script>
</head> </head>
<body> <body>
<header class="topbar"> <header class="topbar">

View File

@ -37,6 +37,33 @@
</div> </div>
{% endif %} {% endif %}
<div id="run-live"
{% if run.status == 'running' %}
hx-get="/runs/{{ run.id }}/live"
hx-trigger="load, every 2s"
hx-swap="outerHTML"
{% endif %}
>
<div class="panel">
<header>
Progress
<span style="margin-left:auto">
<span class="pill {{ run.status }}">{{ run.status }}</span>
{% if run.row_count is not none %}&nbsp;· {{ run.row_count }} rows{% endif %}
</span>
</header>
<div class="body">
{% if run.live_log %}
<pre style="margin:0;white-space:pre-wrap">{{ run.live_log }}</pre>
{% elif run.status == 'running' %}
<span style="color:var(--text-muted)">Waiting for output…</span>
{% else %}
<span style="color:var(--text-muted)"></span>
{% endif %}
</div>
</div>
</div>
{% if run.jrunner_stdout or run.jrunner_stderr %} {% if run.jrunner_stdout or run.jrunner_stderr %}
<div class="panel"> <div class="panel">
<header>jrunner output</header> <header>jrunner output</header>