diff --git a/CLAUDE.md b/CLAUDE.md index e44ff35..c016259 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,7 +43,7 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta - **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock) - **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query - **hook** — post-merge SQL (run_order, run_on: success/failure/always) -- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing +- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run) ## Engine Flow @@ -55,7 +55,7 @@ run_module(module_id) → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert) → if dry_run: write run_log (status='dry_run'), return early — no data movement → DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift - → jrunner migrate source → staging + → jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log → run merge SQL via jrunner → run hooks in order → write run_log entry @@ -94,7 +94,8 @@ Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF - `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers - `/` and other bare paths — full HTML pages (Jinja2), no auth currently -- `POST /modules/{id}/run` returns `{run_id}` immediately; run is async; poll `/runs/{id}` for status +- `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks +- `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status ## Tech Stack diff --git a/SPEC.md b/SPEC.md index c9cc87e..8b7a916 100644 --- a/SPEC.md +++ b/SPEC.md @@ -236,7 +236,8 @@ run_log( watermark_values_json, -- {prev_period: "'2610'", ...} jrunner_stdout, jrunner_stderr, - hook_log + hook_log, + live_log -- jrunner stdout lines appended in real time during migrate ) ``` @@ -555,30 +556,35 @@ GET /settings POST /settings/{key} ``` -### Async runs + SSE +### Async runs + live progress `POST /modules/{id}/run` does NOT block. It atomically acquires the module lock, kicks off the sync in a background task, and returns -`{"run_id": 4892}` immediately. +`{"run_id": 4892}` immediately (both the API and web POST). Two ways to watch a run after that: 1. **Polling** — `GET /runs/{id}` returns the run_log row; keep hitting it until `status != running`. Simple, works anywhere. -2. **Streaming** — `GET /runs/{id}/stream` opens a Server-Sent Events - connection. The server pushes event lines as things happen — log - lines, row-count updates, final status. The TUI uses this for the - run watch screen. curl supports it with `-N` (no buffering). +2. **Live fragment polling** — the web run detail page (`/runs/{id}`) + embeds an HTMX fragment that polls `GET /runs/{id}/live` every 2 s + while `status == running`. The fragment shows current status, + row count, and the `live_log` field as it grows. Polling stops + automatically once status leaves `running`. -SSE is plain HTTP with a long-lived connection, not WebSockets. Simpler -to implement, works in browsers natively (`EventSource` in JS), works in -curl for debugging. +**Live log** — during `jrunner.migrate()` the engine uses `subprocess.Popen` +(not `subprocess.run`) so stdout can be read line-by-line as jrunner +emits it. Each non-empty line is appended to `run_log.live_log` via +`repo.append_run_live_log()`. How frequently lines appear depends on +jrunner's output flushing behaviour; at minimum the final row-count +summary line appears when the transfer completes. -Splitting `start` from `watch` (two endpoints) means: +Splitting `start` from `watch` means: - Cron-triggered runs don't have to watch - Curl scripting can fire-and-forget -- TUI can reconnect to an already-running sync if it crashes mid-run +- A browser navigating directly to `/runs/{id}` mid-run picks up the + live panel automatically ### Auth diff --git a/pipekit/db.py b/pipekit/db.py index eeadcd9..d9d391b 100644 --- a/pipekit/db.py +++ b/pipekit/db.py @@ -38,6 +38,10 @@ def _apply_migrations(conn: sqlite3.Connection) -> None: if "dest_description" not in cols: conn.execute("ALTER TABLE module ADD COLUMN dest_description TEXT") + rl_cols = {r[1] for r in conn.execute("PRAGMA table_info(run_log)")} + if "live_log" not in rl_cols: + conn.execute("ALTER TABLE run_log ADD COLUMN live_log TEXT") + @contextmanager def connect(db_path: Path | None = None): diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index 82fe67c..1e49f2d 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -113,6 +113,7 @@ def run_module(module_id: int, *, group_run_id: int | None = None, source_conn=source_conn, dest_conn=dest_conn, sql=resolved_sql, dest_table=module["staging_table"], clear=False, + progress_cb=lambda line: repo.append_run_live_log(run_id, line), ) row_count = migrate_result.row_count repo.log_run_output(run_id, jrunner_stdout=migrate_result.stdout, diff --git a/pipekit/jrunner.py b/pipekit/jrunner.py index 4bcdb89..85c1cdf 100644 --- a/pipekit/jrunner.py +++ b/pipekit/jrunner.py @@ -14,6 +14,7 @@ argv or in the database. from __future__ import annotations +from collections.abc import Callable import csv import io import os @@ -21,6 +22,7 @@ import re import shutil import subprocess import tempfile +import threading from dataclasses import dataclass from pathlib import Path @@ -144,8 +146,13 @@ def migrate( clear: bool = False, trim: bool = True, timeout: int = 3600, + progress_cb: "Callable[[str], None] | None" = None, ) -> MigrateResult: - """Stream `sql` results from source into `dest_table` via jrunner migration mode.""" + """Stream `sql` results from source into `dest_table` via jrunner migration mode. + + If ``progress_cb`` is provided it is called with each non-empty stdout line + as jrunner produces it, enabling live progress reporting. + """ path = jrunner_path() with tempfile.NamedTemporaryFile("w", suffix=".sql", delete=False) as f: f.write(sql) @@ -164,21 +171,50 @@ def migrate( argv.append("-t") if clear: argv.append("-c") - r = subprocess.run(argv, capture_output=True, text=True, - timeout=timeout, env=_subprocess_env()) + + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, env=_subprocess_env()) + + stderr_buf: list[str] = [] + + def _drain_stderr() -> None: + for line in proc.stderr: # type: ignore[union-attr] + stderr_buf.append(line) + + t = threading.Thread(target=_drain_stderr, daemon=True) + t.start() + + stdout_lines: list[str] = [] + for line in proc.stdout: # type: ignore[union-attr] + stdout_lines.append(line) + if progress_cb: + stripped = line.rstrip() + if stripped: + progress_cb(stripped) + + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + raise + t.join() + + stdout = "".join(stdout_lines) + stderr = "".join(stderr_buf) finally: os.unlink(sql_path) - if r.returncode != 0: - raise JrunnerError(r.stderr.strip() or r.stdout.strip(), - stdout=r.stdout, stderr=r.stderr) - silent = _detect_silent_failure(r.stdout, r.stderr) + if proc.returncode != 0: + raise JrunnerError(stderr.strip() or stdout.strip(), + stdout=stdout, stderr=stderr) + silent = _detect_silent_failure(stdout, stderr) if silent: - raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr) + raise JrunnerError(silent, stdout=stdout, stderr=stderr) return MigrateResult( - row_count=_parse_row_count(r.stdout + "\n" + r.stderr), - stdout=r.stdout, stderr=r.stderr, + row_count=_parse_row_count(stdout + "\n" + stderr), + stdout=stdout, stderr=stderr, ) diff --git a/pipekit/repo.py b/pipekit/repo.py index 2e20cfe..266f7ad 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -442,6 +442,14 @@ def log_run_output(run_id: int, *, jrunner_stdout: str | None = None, c.execute(f"UPDATE run_log SET {', '.join(sets)} WHERE id=?", vals + [run_id]) +def append_run_live_log(run_id: int, text: str) -> None: + with db.connect() as c: + c.execute( + "UPDATE run_log SET live_log = COALESCE(live_log, '') || ? WHERE id=?", + (text + "\n", run_id), + ) + + def finish_run(run_id: int, *, status: str, row_count: int | None = None, error: str | None = None) -> None: with db.connect() as c: diff --git a/pipekit/schema.sql b/pipekit/schema.sql index 35e19d7..c3b3ecb 100644 --- a/pipekit/schema.sql +++ b/pipekit/schema.sql @@ -108,7 +108,8 @@ CREATE TABLE IF NOT EXISTS run_log ( watermark_values_json TEXT, jrunner_stdout TEXT, jrunner_stderr TEXT, - hook_log TEXT + hook_log TEXT, + live_log TEXT ); CREATE INDEX IF NOT EXISTS idx_run_log_module ON run_log(module_id, id DESC); diff --git a/pipekit/web/app.py b/pipekit/web/app.py index 5aae94e..a60df28 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -15,7 +15,7 @@ from __future__ import annotations from pathlib import Path from urllib.parse import urlencode -from fastapi import APIRouter, FastAPI, HTTPException, Query, Request +from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates @@ -210,17 +210,22 @@ def module_delete(module_id: int): @_router.post("/modules/{module_id}/run") -async def module_run_action(module_id: int, request: Request): +async def module_run_action(module_id: int, request: Request, + background: BackgroundTasks): form = await request.form() dry = form.get("dry_run") == "1" if repo.get_module(module_id) is None: raise HTTPException(404, f"module id={module_id} not found") run_id = repo.create_run(module_id) + background.add_task(_run_in_background, module_id, run_id, dry) + return RedirectResponse(url=f"/runs/{run_id}", status_code=303) + + +def _run_in_background(module_id: int, run_id: int, dry_run: bool) -> None: try: - engine.run_module(module_id, run_id=run_id, dry_run=dry) + engine.run_module(module_id, run_id=run_id, dry_run=dry_run) except engine.LockBusy as e: repo.finish_run(run_id, status="error", error=str(e)) - return RedirectResponse(url=f"/runs/{run_id}", status_code=303) # --------------------------------------------------------------------------- @@ -716,6 +721,20 @@ def run_detail(request: Request, run_id: int): ) +@_router.get("/runs/{run_id}/live", response_class=HTMLResponse) +def run_live_fragment(request: Request, run_id: int): + run = repo.get_run(run_id) + if run is None: + raise HTTPException(404, f"run id={run_id} not found") + module = repo.get_module(run["module_id"]) + run["module_name"] = module["name"] if module else "?" + return _templates.TemplateResponse( + request, + "_run_live.html", + _ctx(run=run), + ) + + # --------------------------------------------------------------------------- # Watermarks — add/edit/delete forms on module detail # --------------------------------------------------------------------------- diff --git a/pipekit/web/templates/_run_live.html b/pipekit/web/templates/_run_live.html new file mode 100644 index 0000000..95cfabd --- /dev/null +++ b/pipekit/web/templates/_run_live.html @@ -0,0 +1,26 @@ +
+
+
+ Progress + + {{ run.status }} + {% if run.row_count is not none %} · {{ run.row_count }} rows{% endif %} + +
+
+ {% if run.live_log %} +
{{ run.live_log }}
+ {% elif run.status == 'running' %} + Waiting for output… + {% else %} + + {% endif %} +
+
+
diff --git a/pipekit/web/templates/base.html b/pipekit/web/templates/base.html index 2bebac3..95f3866 100644 --- a/pipekit/web/templates/base.html +++ b/pipekit/web/templates/base.html @@ -5,6 +5,7 @@ {% block title %}Pipekit{% endblock %} +
diff --git a/pipekit/web/templates/run_detail.html b/pipekit/web/templates/run_detail.html index 37febad..7c901d0 100644 --- a/pipekit/web/templates/run_detail.html +++ b/pipekit/web/templates/run_detail.html @@ -37,6 +37,33 @@ {% endif %} +
+
+
+ Progress + + {{ run.status }} + {% if run.row_count is not none %} · {{ run.row_count }} rows{% endif %} + +
+
+ {% if run.live_log %} +
{{ run.live_log }}
+ {% elif run.status == 'running' %} + Waiting for output… + {% else %} + + {% endif %} +
+
+
+ {% if run.jrunner_stdout or run.jrunner_stderr %}
jrunner output