From c34fcb38ed5f63915b3f164839d529f2d168340a Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Wed, 3 Jun 2026 21:18:13 -0400 Subject: [PATCH] Add scheduling, harden deploy, and update docs Scheduling: cron-based group runs via a daemon thread (scheduler.py) started at API startup. Schedules managed inline on the group edit form. last_fired_at persisted before run to prevent double-fire on restart. Requires croniter (added to requirements.txt); DB migration adds last_fired_at column to schedule table. Deploy: deploy.sh now creates the pipekit system user, chowns the repo, builds the venv as pipekit, and installs/enables the systemd unit. systemd/pipekit.service is now a production-ready unit (User=pipekit uncommented). pipekit secrets set preserves existing file permissions instead of resetting to 0600. Driver registration is now idempotent (upsert via get_driver_by_name + update_driver). Docs: CLAUDE.md and SPEC.md updated to reflect groups, scheduling, scheduler-in-API-process architecture, TUI deferred (not dropped), stop-on-failure tradeoff, jrunner as prerequisite, and deploy flow. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- CLAUDE.md | 25 +++++- SPEC.md | 84 +++++++++++++---- deploy.sh | 114 ++++++++++++++++-------- pipekit/api/app.py | 11 ++- pipekit/cli.py | 29 ++++-- pipekit/db.py | 4 + pipekit/repo.py | 83 +++++++++++++++++ pipekit/scheduler.py | 95 ++++++++++++++++++++ pipekit/web/app.py | 54 ++++++++++- pipekit/web/templates/group_detail.html | 39 ++++++++ pipekit/web/templates/group_form.html | 86 ++++++++++++++++++ requirements.txt | 1 + systemd/pipekit.service | 18 +--- 13 files changed, 562 insertions(+), 81 deletions(-) create mode 100644 pipekit/scheduler.py diff --git a/CLAUDE.md b/CLAUDE.md index 790ae97..24797fc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -19,8 +19,10 @@ pipekit doctor # health check (config, jrunner, DB) pipekit run # run a module synchronously (manual test) pipekit set-password # set HTTP Basic Auth credentials pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted) +pipekit secrets list # show stored secret keys (no values) pipekit drivers list # show registered driver kinds -./deploy.sh # idempotent: venv, deps, launcher, driver registration +./deploy.sh # full idempotent deploy: user, venv, deps, launcher, + # secrets.env, schema init, driver registration, systemd unit ``` ## Architecture @@ -30,7 +32,7 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta **Layers (bottom to top):** 1. **SQLite** (`pipekit.db`) — single file, all state -2. **`repo.py`** — all CRUD for every table; ~1,900 LOC, the only layer that touches the DB +2. **`repo.py`** — all CRUD for every table; ~725 LOC, the only layer that touches the DB 3. **`engine/`** — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release 4. **jrunner** — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner 5. **`api/`** — FastAPI REST endpoints under `/api/*`, HTTP Basic Auth (except `/health`) @@ -44,6 +46,10 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta - **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query - **hook** — post-merge SQL (run_order, run_on: success/failure/always) - **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run) +- **grp** — named group of modules; modules belong to a group via `group_member` (with run_order) +- **group_member** — join table: group → module with run_order; disabled modules are skipped by group runs +- **group_run** — one execution of a group: status, timing, triggered_by (manual | schedule:{id}) +- **schedule** — cron expression tied to a group; scheduler fires `run_group` when due; tracks `last_fired_at` to survive restarts without double-firing ## Engine Flow @@ -60,6 +66,13 @@ run_module(module_id) → run hooks in order → write run_log entry → UPDATE module SET running=0 (in finally) + +run_group(group_id) + → create group_run row (status=running) + → for each enabled group_member in run_order: call run_module(group_run_id=...) + → continues past individual module failures (all members run) + → final status: dry_run if all dry_run, error if any errored, success otherwise + → finish_group_run(status) ``` ## Run Statuses @@ -94,19 +107,25 @@ Watermarks are managed inline on both the module edit form and wizard step 3 (no Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run. +## Scheduler + +`pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation. + ## API vs. Web - `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers - `/` and other bare paths — full HTML pages (Jinja2), no auth currently - `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks - `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status +- `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links ## Tech Stack -- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib) +- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter - `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep) - Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs - jrunner: separate Java tool, must be on PATH +- Runs as the `pipekit` system user; venv at `/opt/pipekit/.venv` owned by that user; secrets at `/etc/pipekit/secrets.env` (mode 0640, group pipekit) ## Full Spec diff --git a/SPEC.md b/SPEC.md index 74a7d79..b884068 100644 --- a/SPEC.md +++ b/SPEC.md @@ -6,7 +6,9 @@ design from first principles. The previous version is archived at ## Status -**Spec is done.** Ready to move to implementation planning. +**Implementation is underway.** Core module sync, wizard, web UI, groups, +scheduling, and deploy tooling are built and running. The TUI was dropped +in favour of a web UI (see Architecture note below). One item is intentionally deferred: the **migration plan** for bringing over the ~90 existing modules from `/opt/sync`. Not needed to start @@ -72,16 +74,30 @@ jrunner (Java CLI — bulk JDBC transfer + query mode) ↑ engine (Python — orchestrates jrunner, watermarks, merge, hooks, run log) ↑ -API (FastAPI — REST, Basic Auth) +API process (FastAPI — REST, Basic Auth) + ├── scheduler thread (daemon thread; fires group runs from cron schedules) + └── web UI (Jinja2 templates, HTMX, vanilla JS — served alongside /api/*) ↑ -TUI / web UI / curl +TUI (deferred) / curl ``` The engine shells out to jrunner for **everything that touches a database** — bulk transfers, watermark resolver queries, hooks. No separate JDBC layer in Python. One driver-loading code path, one set of bugs. -The API exists so a web front-end or curl can drive Pipekit, not just the TUI. +The scheduler is a daemon thread started inside the API process via FastAPI +lifespan — not a separate process or service. It shares the same SQLite +connection pool and imports. + +The FastAPI app serves both a JSON REST API (`/api/*`) and full HTML pages +(`/`, `/modules/*`, `/groups/*`, etc.) using Jinja2 templates, HTMX for +in-place updates, and vanilla JS. The web UI covers all management: modules, +connections, groups, schedules, run history. + +**TUI is deferred, not dropped.** The original TUI design remains valid — it +would be an HTTP client against the API, never touching SQLite directly. Build +it when the web UI's limitations become friction (keyboard-heavy workflows, +SSH-only environments). The API design already accommodates it. ## Storage: SQLite @@ -265,12 +281,21 @@ grp(id, name) group_member(id, group_id, module_id, run_order) -- many-to-many; same module can live in multiple groups with different run_orders -schedule(id, group_id, cron_expr, enabled) +schedule(id, group_id, cron_expr, enabled, last_fired_at) -- a group can have 0..N schedules ``` -**Sequential execution, stop on failure.** Mirrors the `set -e` behavior of -existing orchestrator scripts. +**Sequential execution, continue past failures.** Each enabled member runs in +`run_order` regardless of whether prior members errored. Final group_run status +is `error` if any member errored, `success` if all succeeded, `dry_run` if all +were dry runs. Disabled modules are skipped entirely. + +This was a deliberate change from the original "stop on failure" design. The +case for stopping: mirrors `set -e` shell behaviour; avoids running downstream +modules against stale data if an upstream one failed. The case for continuing: +full visibility into all failures in one run rather than discovering them +one-at-a-time; individual module locks already prevent unsafe concurrency. +A per-group `stop_on_failure` flag would satisfy both and is worth adding. **Many-to-many membership.** Junction table is needed anyway for `run_order`, so many-to-many costs nothing extra. Unique constraint can be added later if @@ -279,9 +304,14 @@ ever needed. **Schedule attaches to groups, not modules.** Matches the user's mental model and avoids a huge cron-list. Individual modules can still be run ad-hoc. -**Scheduler.** Background thread inside the API process. Wakes every minute, -evaluates all enabled schedules, fires any whose cron matches. A scheduled -fire and a manual fire use the same code path — only `triggered_by` differs. +**Scheduler.** `pipekit/scheduler.py` — a single daemon thread started via +FastAPI lifespan on server startup. Wakes every 60 s. For each enabled +schedule, uses `croniter` to find the next occurrence after `last_fired_at` +(or 2 minutes ago if never fired); fires if that occurrence has passed. +`last_fired_at` is written to the DB *before* the run thread is spawned — +prevents double-fire if a run is slow or pipekit restarts mid-run. A scheduled +fire and a manual fire use the same `run_group` code path; only `triggered_by` +differs (`schedule:{id}` vs `manual`). **Ad-hoc runs:** @@ -332,8 +362,30 @@ Pipekit verifies jrunner exists on startup (configurable path in drivers loadable, database accessible, all configured connections testable. First thing to run after a `git pull`. -**Packaging.** Start loose-coupled (install jrunner separately, point Pipekit -at it). Bundle later if/when the two-step gets annoying. +**`./deploy.sh`** — idempotent full-system deploy. Run once from a fresh clone +or re-run after any code update. Does: + +1. Creates the `pipekit` system user (`useradd --system`) if absent +2. Chowns `/opt/pipekit` to `pipekit:pipekit` +3. Creates/repairs the Python venv at `.venv` owned by `pipekit` +4. Installs deps as `pipekit` user (`pip install -r requirements.txt`) +5. Installs `/usr/local/bin/pipekit` launcher symlink +6. Creates `/etc/pipekit/secrets.env` (mode 0640, group pipekit) if absent +7. Runs `pipekit init` (schema creation + migrations) as `pipekit` +8. Upserts JDBC driver rows for each jar found in jrunner's lib dir +9. Installs and enables `systemd/pipekit.service` + +**Secrets** live in `/etc/pipekit/secrets.env` as `KEY=VALUE` lines. The +`password` field on a connection row stores `$KEY`; the engine resolves it at +runtime from the environment. The systemd unit loads this file as +`EnvironmentFile=`. Mode 0640, group pipekit — the service account can read it +but it is not world-readable. Managed with `pipekit secrets set/list/unset`. + +**jrunner is a prerequisite, not managed by deploy.sh.** `deploy.sh` checks +that `jrunner` is on PATH and aborts with a clear message if not. Installing +jrunner (`/opt/jrunner/deploy.sh`) is a separate step the operator does first. +Keeping them decoupled means jrunner can be updated independently. Bundle later +if/when the two-step gets annoying. ## New module wizard @@ -628,7 +680,7 @@ behavior. | Hooks | Per-module, post-merge, run_on success/failure/always | | Group hooks | Deferred — not needed yet | | Group membership | Many-to-many (junction table for run_order anyway) | -| Group execution | Sequential, stop on failure | +| Group execution | Sequential, continue past failures; final status = worst of members | | Schedules | Attach to groups; multiple schedules per group allowed | | Locking | Atomic UPDATE on `module.running`; PID + time-based stale clearing | | Credentials | Env var references (`$DB2PW`); resolved at runtime | @@ -642,6 +694,8 @@ behavior. | Linked servers | Not first-class; only affect FROM-clause syntax at author time; not persisted on module | | API style | REST, GET for reads, POST for writes, no PUT/DELETE | | Run model | Async — POST /run returns run_id immediately; watch via polling or SSE stream | -| Live output | Server-Sent Events (SSE) — plain HTTP, curl-friendly, browser-native | +| Live output | HTMX polling every 2–3 s (SSE deferred — polling sufficient for current scale) | | Auth | HTTP Basic, single user, creds in settings table | -| TUI ↔ backend | TUI is an HTTP client; never touches SQLite directly | +| TUI | Deferred — web UI built first; TUI remains valid future work as an API client | +| Scheduler | croniter for cron evaluation; last_fired_at persisted before run to prevent double-fire | +| Service account | Runs as pipekit system user; venv + db owned by that user; secrets 0640 group pipekit | diff --git a/deploy.sh b/deploy.sh index 3f80b80..1256984 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,17 +1,22 @@ #!/usr/bin/env bash -# Pipekit deployment — idempotent. Re-run any time. +# Pipekit deployment — idempotent. Re-run after any code update. # -# Steps: -# 1. Check prerequisites (python3, jrunner on PATH) -# 2. Create Python venv at $REPO/.venv and install requirements -# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python) -# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body) -# 5. Run `pipekit init` to create/upgrade the SQLite schema -# 6. Register driver rows for every JDBC jar shipped with jrunner +# What it does: +# 1. Creates the 'pipekit' system user (if absent) +# 2. Chowns /opt/pipekit to pipekit:pipekit +# 3. Creates Python venv (as pipekit) and installs requirements +# 4. Installs /usr/local/bin/pipekit launcher +# 5. Creates /etc/pipekit/secrets.env (mode 0640, group pipekit) +# 6. Runs 'pipekit init' to create/upgrade the SQLite schema +# 7. Registers JDBC driver rows for every jar shipped with jrunner +# 8. Installs and enables the systemd unit (does not start it) # -# After running: -# - Set DB passwords with: sudo pipekit secrets set -# - See systemd/pipekit.service for a unit file template +# Usage: +# ./deploy.sh # re-execs itself with sudo if needed +# +# After deploy: +# sudo pipekit secrets set KEY VALUE # add connection passwords +# sudo systemctl start pipekit set -euo pipefail @@ -20,7 +25,11 @@ VENV_DIR="$REPO_DIR/.venv" LAUNCHER="/usr/local/bin/pipekit" CONFIG_DIR="/etc/pipekit" SECRETS_FILE="$CONFIG_DIR/secrets.env" +SERVICE_NAME="pipekit" +UNIT_SRC="$REPO_DIR/systemd/pipekit.service" +UNIT_DST="/etc/systemd/system/pipekit.service" +# Re-exec as root if needed if [ "$EUID" -ne 0 ]; then exec sudo -H -E "$0" "$@" fi @@ -31,49 +40,73 @@ echo "venv: $VENV_DIR" echo "secrets: $SECRETS_FILE" echo "" +# ── Prerequisites ───────────────────────────────────────────────────────────── command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; } command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; } +# ── 1. System user ──────────────────────────────────────────────────────────── +if ! id -u "$SERVICE_NAME" >/dev/null 2>&1; then + echo "Creating system user: $SERVICE_NAME" + useradd --system --no-create-home --shell /usr/sbin/nologin "$SERVICE_NAME" +else + echo "User $SERVICE_NAME already exists." +fi + +# ── 2. Ownership ────────────────────────────────────────────────────────────── +echo "Setting ownership of $REPO_DIR to $SERVICE_NAME:$SERVICE_NAME" +chown -R "$SERVICE_NAME:$SERVICE_NAME" "$REPO_DIR" + +# ── 3. Venv + deps ──────────────────────────────────────────────────────────── +if [ -d "$VENV_DIR" ] && [ "$(stat -c '%U' "$VENV_DIR")" != "$SERVICE_NAME" ]; then + echo "Removing root-owned venv and recreating as $SERVICE_NAME" + rm -rf "$VENV_DIR" +fi + if [ ! -d "$VENV_DIR" ]; then echo "Creating venv at $VENV_DIR" - python3 -m venv "$VENV_DIR" + sudo -u "$SERVICE_NAME" python3 -m venv "$VENV_DIR" fi -"$VENV_DIR/bin/pip" install --quiet --upgrade pip -"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt" -echo "Python deps installed." -# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed. +echo "Installing Python dependencies" +sudo -u "$SERVICE_NAME" "$VENV_DIR/bin/pip" install --quiet --upgrade pip +sudo -u "$SERVICE_NAME" "$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt" +echo "Dependencies installed." + +# ── 4. Launcher ─────────────────────────────────────────────────────────────── chmod +x "$REPO_DIR/bin/pipekit" ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER" echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit" +# ── 5. Secrets file ─────────────────────────────────────────────────────────── install -d -m 0755 "$CONFIG_DIR" if [ ! -f "$SECRETS_FILE" ]; then - install -m 0600 /dev/null "$SECRETS_FILE" + install -m 0640 /dev/null "$SECRETS_FILE" + chown "root:$SERVICE_NAME" "$SECRETS_FILE" cat > "$SECRETS_FILE" <<'EOF' -# pipekit secrets — sourced by the service process (EnvironmentFile=) -# or by the shell before `pipekit serve`. One KEY=VALUE per line. -# Connection rows reference these as $KEY (e.g. password: "$DB2PW"). -# -# This file must stay mode 0600 and out of version control. -# Use `sudo pipekit secrets set ` to add entries safely. +# pipekit secrets — loaded by the systemd unit as EnvironmentFile. +# Connection passwords are stored as $KEY references in the DB. +# Add entries with: sudo pipekit secrets set KEY VALUE EOF - chmod 0600 "$SECRETS_FILE" echo "Created $SECRETS_FILE" else + # Ensure correct permissions even if file pre-existed + chown "root:$SERVICE_NAME" "$SECRETS_FILE" + chmod 0640 "$SECRETS_FILE" echo "Keeping existing $SECRETS_FILE" fi -"$LAUNCHER" init +# ── 6. Schema init ──────────────────────────────────────────────────────────── +sudo -u "$SERVICE_NAME" "$LAUNCHER" init +echo "Schema initialised." -# Register drivers for each JDBC jar jrunner ships with. +# ── 7. Driver registration ──────────────────────────────────────────────────── JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib" register_jar() { local kind="$1" pattern="$2" local jar jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)" if [ -n "$jar" ]; then - "$LAUNCHER" drivers register "$kind" --jar "$jar" + sudo -u "$SERVICE_NAME" "$LAUNCHER" drivers register "$kind" --jar "$jar" else echo " (no $pattern in $JR_LIB — skipping $kind)" fi @@ -82,16 +115,25 @@ register_jar db2 "jt400-*.jar" register_jar pg "postgresql-*.jar" register_jar mssql "mssql-jdbc-*.jar" +# ── 8. Systemd unit ─────────────────────────────────────────────────────────── +if [ ! -f "$UNIT_SRC" ]; then + echo "WARNING: $UNIT_SRC not found — skipping systemd install" +else + cp "$UNIT_SRC" "$UNIT_DST" + systemctl daemon-reload + systemctl enable "$SERVICE_NAME" + echo "Systemd unit installed and enabled." +fi + echo "" echo "pipekit deployed." echo "" echo "Next steps:" -echo " 1. Set passwords: sudo pipekit secrets set DB2PW" -echo " sudo pipekit secrets set PGPW" -echo " 2. Start the server manually:" -echo " set -a; source $SECRETS_FILE; set +a" -echo " pipekit serve" -echo " 3. Or install the systemd unit:" -echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/" -echo " sudo systemctl daemon-reload" -echo " sudo systemctl enable --now pipekit" +echo " 1. Add connection passwords:" +echo " sudo pipekit secrets set DB2PW " +echo " sudo pipekit secrets set PGPW " +echo " 2. Start the service:" +echo " sudo systemctl start pipekit" +echo " 3. Check it:" +echo " sudo systemctl status pipekit" +echo " journalctl -u pipekit -f" diff --git a/pipekit/api/app.py b/pipekit/api/app.py index a66c07b..e201d02 100644 --- a/pipekit/api/app.py +++ b/pipekit/api/app.py @@ -7,6 +7,8 @@ content-negotiation complexity and keeps the API curl-testable. from __future__ import annotations +from contextlib import asynccontextmanager + from fastapi import FastAPI from .. import __version__, db, jrunner @@ -14,8 +16,15 @@ from ..web import mount_web from .routes import connections, introspect, modules, runs, system +@asynccontextmanager +async def _lifespan(app: FastAPI): + from ..scheduler import start_scheduler + start_scheduler() + yield + + def create_app() -> FastAPI: - app = FastAPI(title="Pipekit", version=__version__) + app = FastAPI(title="Pipekit", version=__version__, lifespan=_lifespan) app.include_router(system.router) app.include_router(connections.router, prefix="/api") app.include_router(introspect.router, prefix="/api") diff --git a/pipekit/cli.py b/pipekit/cli.py index 89e6846..2690c88 100644 --- a/pipekit/cli.py +++ b/pipekit/cli.py @@ -78,12 +78,20 @@ def cmd_drivers_register(args) -> int: f"(registering anyway)") name = args.name or f"{args.kind}-jdbc" - row = repo.create_driver( - name=name, kind=args.kind, jar_file=args.jar, - class_name=class_name, url_template=args.url_template, - ) - print(f"registered driver id={row['id']} name={row['name']!r} " - f"kind={row['kind']!r}") + existing = repo.get_driver_by_name(name) + if existing: + row = repo.update_driver(existing["id"], jar_file=args.jar, + class_name=class_name, + url_template=args.url_template) + print(f"updated driver id={row['id']} name={row['name']!r} " + f"kind={row['kind']!r}") + else: + row = repo.create_driver( + name=name, kind=args.kind, jar_file=args.jar, + class_name=class_name, url_template=args.url_template, + ) + print(f"registered driver id={row['id']} name={row['name']!r} " + f"kind={row['kind']!r}") return 0 @@ -218,7 +226,14 @@ def cmd_secrets_set(args) -> int: tmp = path + ".tmp" with open(tmp, "w") as f: f.writelines(lines) - os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600 + # Preserve existing permissions/ownership if the file already exists; + # otherwise default to 0640 so the pipekit service user can read it. + if os.path.exists(path): + st = os.stat(path) + os.chmod(tmp, stat.S_IMODE(st.st_mode)) + os.chown(tmp, st.st_uid, st.st_gid) + else: + os.chmod(tmp, 0o640) os.replace(tmp, path) print(f"{'updated' if replaced else 'added'} {args.key} in {path}") return 0 diff --git a/pipekit/db.py b/pipekit/db.py index d9d391b..e1a7730 100644 --- a/pipekit/db.py +++ b/pipekit/db.py @@ -42,6 +42,10 @@ def _apply_migrations(conn: sqlite3.Connection) -> None: if "live_log" not in rl_cols: conn.execute("ALTER TABLE run_log ADD COLUMN live_log TEXT") + sc_cols = {r[1] for r in conn.execute("PRAGMA table_info(schedule)")} + if "last_fired_at" not in sc_cols: + conn.execute("ALTER TABLE schedule ADD COLUMN last_fired_at TEXT") + @contextmanager def connect(db_path: Path | None = None): diff --git a/pipekit/repo.py b/pipekit/repo.py index 18a8e4f..b2ef192 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -33,6 +33,27 @@ def create_driver(*, name: str, kind: str, jar_file: str, class_name: str, return _row(c.execute("SELECT * FROM driver WHERE id=?", (cur.lastrowid,)).fetchone()) +def get_driver_by_name(name: str) -> dict | None: + with db.connect() as c: + return _row(c.execute( + "SELECT * FROM driver WHERE name=?", (name,)).fetchone()) + + +def update_driver(driver_id: int, *, jar_file: str | None = None, + class_name: str | None = None, + url_template: str | None = None) -> dict: + fields, vals = [], [] + for col, val in (("jar_file", jar_file), ("class_name", class_name), + ("url_template", url_template)): + if val is not None: + fields.append(f"{col}=?"); vals.append(val) + if fields: + vals.append(driver_id) + with db.connect() as c: + c.execute(f"UPDATE driver SET {', '.join(fields)} WHERE id=?", vals) + return get_driver_row(driver_id) + + def list_drivers() -> list[dict]: with db.connect() as c: return [dict(r) for r in c.execute("SELECT * FROM driver ORDER BY name")] @@ -519,6 +540,68 @@ def delete_group(group_id: int) -> bool: return cur.rowcount > 0 +# --------------------------------------------------------------------------- +# Schedules +# --------------------------------------------------------------------------- + +def create_schedule(*, group_id: int, cron_expr: str, enabled: int = 1) -> dict: + with db.connect() as c: + cur = c.execute( + "INSERT INTO schedule (group_id, cron_expr, enabled) VALUES (?, ?, ?)", + (group_id, cron_expr, enabled), + ) + return _row(c.execute("SELECT * FROM schedule WHERE id=?", (cur.lastrowid,)).fetchone()) + + +def get_schedule(schedule_id: int) -> dict | None: + with db.connect() as c: + return _row(c.execute("SELECT * FROM schedule WHERE id=?", (schedule_id,)).fetchone()) + + +def list_schedules_for_group(group_id: int) -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute( + "SELECT * FROM schedule WHERE group_id=? ORDER BY id", (group_id,))] + + +def update_schedule(schedule_id: int, *, cron_expr: str | None = None, + enabled: int | None = None) -> dict | None: + fields, vals = [], [] + for col, val in (("cron_expr", cron_expr), ("enabled", enabled)): + if val is not None: + fields.append(f"{col}=?"); vals.append(val) + if not fields: + return get_schedule(schedule_id) + vals.append(schedule_id) + with db.connect() as c: + c.execute(f"UPDATE schedule SET {', '.join(fields)} WHERE id=?", vals) + return get_schedule(schedule_id) + + +def delete_schedule(schedule_id: int) -> bool: + with db.connect() as c: + cur = c.execute("DELETE FROM schedule WHERE id=?", (schedule_id,)) + return cur.rowcount > 0 + + +def list_all_enabled_schedules() -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute( + "SELECT s.*, g.name AS group_name " + "FROM schedule s " + "JOIN grp g ON s.group_id=g.id " + "WHERE s.enabled=1" + )] + + +def mark_schedule_fired(schedule_id: int) -> None: + with db.connect() as c: + c.execute( + "UPDATE schedule SET last_fired_at=datetime('now') WHERE id=?", + (schedule_id,), + ) + + # --------------------------------------------------------------------------- # Group members # --------------------------------------------------------------------------- diff --git a/pipekit/scheduler.py b/pipekit/scheduler.py new file mode 100644 index 0000000..64b9727 --- /dev/null +++ b/pipekit/scheduler.py @@ -0,0 +1,95 @@ +"""Background scheduler: fires group runs when cron expressions are due. + +One daemon thread wakes every 60 s, checks all enabled schedules, and +spawns a per-run thread for any that are due. ``last_fired_at`` is written +to the DB before the run starts so a slow or crashing run cannot double-fire +the same occurrence. Survives server restarts: missed ticks while pipekit was +down are detected on the next startup check (5 s after start). +""" + +from __future__ import annotations + +import logging +import threading +import time +from datetime import datetime, timedelta, timezone + +log = logging.getLogger(__name__) + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _check_and_fire() -> None: + from . import engine, repo + try: + schedules = repo.list_all_enabled_schedules() + except Exception as e: # noqa: BLE001 + log.error("scheduler: failed to load schedules: %s", e) + return + + now = _utcnow() + for sched in schedules: + try: + _maybe_fire(sched, now) + except Exception as e: # noqa: BLE001 + log.error("scheduler: error on schedule id=%s: %s", sched["id"], e) + + +def _maybe_fire(sched: dict, now: datetime) -> None: + from croniter import croniter, CroniterBadCronError + from . import engine, repo + + last_str = sched["last_fired_at"] + if last_str: + last_dt = datetime.fromisoformat(last_str).replace(tzinfo=timezone.utc) + else: + # Never fired — look back 2 minutes so we pick up the current tick + # without replaying indefinitely old missed ticks. + last_dt = now - timedelta(seconds=120) + + try: + cron = croniter(sched["cron_expr"], last_dt) + next_dt = cron.get_next(datetime).replace(tzinfo=timezone.utc) + except CroniterBadCronError: + log.warning("scheduler: invalid cron_expr %r for schedule id=%s — skipping", + sched["cron_expr"], sched["id"]) + return + + if next_dt > now: + return + + log.info("scheduler: firing group_id=%s (schedule id=%s expr=%r)", + sched["group_id"], sched["id"], sched["cron_expr"]) + + # Mark fired before spawning so a slow run can't double-fire. + repo.mark_schedule_fired(sched["id"]) + + group_id = sched["group_id"] + sched_id = sched["id"] + + def _run() -> None: + try: + group_run_id = repo.create_group_run( + group_id, triggered_by=f"schedule:{sched_id}" + ) + engine.run_group(group_id, group_run_id=group_run_id) + except Exception as exc: # noqa: BLE001 + log.error("scheduler: run failed for group_id=%s: %s", group_id, exc) + + threading.Thread(target=_run, daemon=True).start() + + +def start_scheduler() -> None: + """Start the background scheduler daemon thread. Safe to call once at startup.""" + + def _loop() -> None: + time.sleep(5) # let the app finish initialising + while True: + _check_and_fire() + time.sleep(60) + + t = threading.Thread(target=_loop, daemon=True, name="pipekit-scheduler") + t.start() + log.info("scheduler: started") diff --git a/pipekit/web/app.py b/pipekit/web/app.py index b7b26b8..19f4962 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -652,6 +652,23 @@ def _save_inline_watermarks(form, module_id: int) -> None: default_value=default_val) +def _schedules_with_next(schedules: list[dict]) -> list[dict]: + """Attach a 'next_fire_at' string to each schedule dict.""" + from croniter import croniter, CroniterBadCronError + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + result = [] + for s in schedules: + s = dict(s) + try: + cron = croniter(s["cron_expr"], now) + s["next_fire_at"] = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M UTC") + except CroniterBadCronError: + s["next_fire_at"] = "invalid expression" + result.append(s) + return result + + def _sanitize_identifier(name: str) -> str: """Lower-case a source column name and replace characters that aren't valid in an unquoted identifier with underscores.""" @@ -1020,7 +1037,7 @@ def groups_index(request: Request): def group_new(request: Request): return _templates.TemplateResponse( request, "group_form.html", - _ctx(group=None, all_modules=repo.list_modules(), members=[], + _ctx(group=None, all_modules=repo.list_modules(), members=[], schedules=[], form_action="/groups", cancel_url="/groups"), ) @@ -1033,6 +1050,7 @@ async def group_create(request: Request): raise HTTPException(409, f"group name {name!r} already exists") grp = repo.create_group(name=name) _save_group_members(form, grp["id"]) + _save_group_schedules(form, grp["id"]) return RedirectResponse(url=f"/groups/{grp['id']}", status_code=303) @@ -1044,10 +1062,11 @@ def group_detail(request: Request, group_id: int): members = repo.list_group_members(group_id) recent_runs = repo.list_group_runs(group_id, limit=10) group_running = bool(recent_runs and recent_runs[0]["status"] == "running") + schedules = _schedules_with_next(repo.list_schedules_for_group(group_id)) return _templates.TemplateResponse( request, "group_detail.html", _ctx(group=grp, members=members, recent_runs=recent_runs, - group_running=group_running), + group_running=group_running, schedules=schedules), ) @@ -1057,11 +1076,12 @@ def group_edit(request: Request, group_id: int): if grp is None: raise HTTPException(404, f"group id={group_id} not found") members = repo.list_group_members(group_id) + schedules = repo.list_schedules_for_group(group_id) return _templates.TemplateResponse( request, "group_form.html", _ctx(group=grp, all_modules=repo.list_modules(), members=members, - form_action=f"/groups/{group_id}", cancel_url=f"/groups/{group_id}", - section="groups"), + schedules=schedules, + form_action=f"/groups/{group_id}", cancel_url=f"/groups/{group_id}"), ) @@ -1076,6 +1096,7 @@ async def group_update(request: Request, group_id: int): raise HTTPException(409, f"group name {name!r} already exists") repo.update_group(group_id, name=name) _save_group_members(form, group_id) + _save_group_schedules(form, group_id) return RedirectResponse(url=f"/groups/{group_id}", status_code=303) @@ -1157,6 +1178,31 @@ def group_run_live_fragment(request: Request, group_run_id: int): ) +def _save_group_schedules(form, group_id: int) -> None: + """Process sched_* parallel arrays from a group form POST.""" + from croniter import croniter as _croniter + exprs = form.getlist("sched_cron_expr") + if not exprs: + return + ids = form.getlist("sched_id") + enableds = form.getlist("sched_enabled") # select always submits a value + for sid_str in form.getlist("sched_deleted_id"): + if sid_str: + repo.delete_schedule(int(sid_str)) + for i, expr in enumerate(exprs): + expr = expr.strip() + if not expr: + continue + if not _croniter.is_valid(expr): + continue + enabled = 1 if (i < len(enableds) and enableds[i] == "1") else 0 + sid_str = ids[i] if i < len(ids) else "" + if sid_str: + repo.update_schedule(int(sid_str), cron_expr=expr, enabled=enabled) + else: + repo.create_schedule(group_id=group_id, cron_expr=expr, enabled=enabled) + + def _save_group_members(form, group_id: int) -> None: module_ids = form.getlist("member_module_id") run_orders = form.getlist("member_run_order") diff --git a/pipekit/web/templates/group_detail.html b/pipekit/web/templates/group_detail.html index 367d2b9..f91482b 100644 --- a/pipekit/web/templates/group_detail.html +++ b/pipekit/web/templates/group_detail.html @@ -56,6 +56,45 @@ +
+
+ Schedules + Edit schedules → +
+
+ {% if schedules %} + + + + + + + + + + + {% for s in schedules %} + + + + + + + {% endfor %} + +
cron expressionnext fire (UTC)last firedenabled
{{ s.cron_expr }}{% if s.enabled %}{{ s.next_fire_at }}{% else %}—{% endif %}{{ s.last_fired_at or "never" }} + {% if s.enabled %} + yes + {% else %} + no + {% endif %} +
+ {% else %} +
No schedules — add one →
+ {% endif %} +
+
+ {% include "_group_live.html" %}
diff --git a/pipekit/web/templates/group_form.html b/pipekit/web/templates/group_form.html index 6acb45c..123f8ed 100644 --- a/pipekit/web/templates/group_form.html +++ b/pipekit/web/templates/group_form.html @@ -63,6 +63,52 @@
+
+
+ Schedules + cron expressions in UTC — e.g. 0 4 * * * for daily at 04:00 + +
+
+ + + + + + + + + + {% for s in schedules %} + + + + + + + {% endfor %} + +
cron expressionenabled
+ + + + + +
+ {% if not schedules %} +
No schedules — group runs manually only.
+ {% endif %} +
+
+
cancel @@ -71,6 +117,27 @@
+ +