Add scheduling, harden deploy, and update docs

Scheduling: cron-based group runs via a daemon thread (scheduler.py)
started at API startup. Schedules managed inline on the group edit form.
last_fired_at persisted before run to prevent double-fire on restart.
Requires croniter (added to requirements.txt); DB migration adds
last_fired_at column to schedule table.

Deploy: deploy.sh now creates the pipekit system user, chowns the repo,
builds the venv as pipekit, and installs/enables the systemd unit.
systemd/pipekit.service is now a production-ready unit (User=pipekit
uncommented). pipekit secrets set preserves existing file permissions
instead of resetting to 0600. Driver registration is now idempotent
(upsert via get_driver_by_name + update_driver).

Docs: CLAUDE.md and SPEC.md updated to reflect groups, scheduling,
scheduler-in-API-process architecture, TUI deferred (not dropped),
stop-on-failure tradeoff, jrunner as prerequisite, and deploy flow.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-06-03 21:18:13 -04:00
parent 31d670b4e6
commit c34fcb38ed
13 changed files with 562 additions and 81 deletions

View File

@ -19,8 +19,10 @@ pipekit doctor # health check (config, jrunner, DB)
pipekit run <module_name> # run a module synchronously (manual test)
pipekit set-password <username> # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted)
pipekit secrets list # show stored secret keys (no values)
pipekit drivers list # show registered driver kinds
./deploy.sh # idempotent: venv, deps, launcher, driver registration
./deploy.sh # full idempotent deploy: user, venv, deps, launcher,
# secrets.env, schema init, driver registration, systemd unit
```
## Architecture
@ -30,7 +32,7 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta
**Layers (bottom to top):**
1. **SQLite** (`pipekit.db`) — single file, all state
2. **`repo.py`** — all CRUD for every table; ~1,900 LOC, the only layer that touches the DB
2. **`repo.py`** — all CRUD for every table; ~725 LOC, the only layer that touches the DB
3. **`engine/`** — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
4. **jrunner** — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
5. **`api/`** — FastAPI REST endpoints under `/api/*`, HTTP Basic Auth (except `/health`)
@ -44,6 +46,10 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta
- **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query
- **hook** — post-merge SQL (run_order, run_on: success/failure/always)
- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
- **grp** — named group of modules; modules belong to a group via `group_member` (with run_order)
- **group_member** — join table: group → module with run_order; disabled modules are skipped by group runs
- **group_run** — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
- **schedule** — cron expression tied to a group; scheduler fires `run_group` when due; tracks `last_fired_at` to survive restarts without double-firing
## Engine Flow
@ -60,6 +66,13 @@ run_module(module_id)
→ run hooks in order
→ write run_log entry
→ UPDATE module SET running=0 (in finally)
run_group(group_id)
→ create group_run row (status=running)
→ for each enabled group_member in run_order: call run_module(group_run_id=...)
→ continues past individual module failures (all members run)
→ final status: dry_run if all dry_run, error if any errored, success otherwise
→ finish_group_run(status)
```
## Run Statuses
@ -94,19 +107,25 @@ Watermarks are managed inline on both the module edit form and wizard step 3 (no
Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.
## Scheduler
`pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation.
## API vs. Web
- `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
- `/` and other bare paths — full HTML pages (Jinja2), no auth currently
- `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks
- `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
- `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links
## Tech Stack
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib)
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
- `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
- jrunner: separate Java tool, must be on PATH
- Runs as the `pipekit` system user; venv at `/opt/pipekit/.venv` owned by that user; secrets at `/etc/pipekit/secrets.env` (mode 0640, group pipekit)
## Full Spec

84
SPEC.md
View File

@ -6,7 +6,9 @@ design from first principles. The previous version is archived at
## Status
**Spec is done.** Ready to move to implementation planning.
**Implementation is underway.** Core module sync, wizard, web UI, groups,
scheduling, and deploy tooling are built and running. The TUI was dropped
in favour of a web UI (see Architecture note below).
One item is intentionally deferred: the **migration plan** for bringing
over the ~90 existing modules from `/opt/sync`. Not needed to start
@ -72,16 +74,30 @@ jrunner (Java CLI — bulk JDBC transfer + query mode)
engine (Python — orchestrates jrunner, watermarks, merge, hooks, run log)
API (FastAPI — REST, Basic Auth)
API process (FastAPI — REST, Basic Auth)
├── scheduler thread (daemon thread; fires group runs from cron schedules)
└── web UI (Jinja2 templates, HTMX, vanilla JS — served alongside /api/*)
TUI / web UI / curl
TUI (deferred) / curl
```
The engine shells out to jrunner for **everything that touches a database**
bulk transfers, watermark resolver queries, hooks. No separate JDBC layer in
Python. One driver-loading code path, one set of bugs.
The API exists so a web front-end or curl can drive Pipekit, not just the TUI.
The scheduler is a daemon thread started inside the API process via FastAPI
lifespan — not a separate process or service. It shares the same SQLite
connection pool and imports.
The FastAPI app serves both a JSON REST API (`/api/*`) and full HTML pages
(`/`, `/modules/*`, `/groups/*`, etc.) using Jinja2 templates, HTMX for
in-place updates, and vanilla JS. The web UI covers all management: modules,
connections, groups, schedules, run history.
**TUI is deferred, not dropped.** The original TUI design remains valid — it
would be an HTTP client against the API, never touching SQLite directly. Build
it when the web UI's limitations become friction (keyboard-heavy workflows,
SSH-only environments). The API design already accommodates it.
## Storage: SQLite
@ -265,12 +281,21 @@ grp(id, name)
group_member(id, group_id, module_id, run_order)
-- many-to-many; same module can live in multiple groups with different run_orders
schedule(id, group_id, cron_expr, enabled)
schedule(id, group_id, cron_expr, enabled, last_fired_at)
-- a group can have 0..N schedules
```
**Sequential execution, stop on failure.** Mirrors the `set -e` behavior of
existing orchestrator scripts.
**Sequential execution, continue past failures.** Each enabled member runs in
`run_order` regardless of whether prior members errored. Final group_run status
is `error` if any member errored, `success` if all succeeded, `dry_run` if all
were dry runs. Disabled modules are skipped entirely.
This was a deliberate change from the original "stop on failure" design. The
case for stopping: mirrors `set -e` shell behaviour; avoids running downstream
modules against stale data if an upstream one failed. The case for continuing:
full visibility into all failures in one run rather than discovering them
one-at-a-time; individual module locks already prevent unsafe concurrency.
A per-group `stop_on_failure` flag would satisfy both and is worth adding.
**Many-to-many membership.** Junction table is needed anyway for `run_order`,
so many-to-many costs nothing extra. Unique constraint can be added later if
@ -279,9 +304,14 @@ ever needed.
**Schedule attaches to groups, not modules.** Matches the user's mental model
and avoids a huge cron-list. Individual modules can still be run ad-hoc.
**Scheduler.** Background thread inside the API process. Wakes every minute,
evaluates all enabled schedules, fires any whose cron matches. A scheduled
fire and a manual fire use the same code path — only `triggered_by` differs.
**Scheduler.** `pipekit/scheduler.py` — a single daemon thread started via
FastAPI lifespan on server startup. Wakes every 60 s. For each enabled
schedule, uses `croniter` to find the next occurrence after `last_fired_at`
(or 2 minutes ago if never fired); fires if that occurrence has passed.
`last_fired_at` is written to the DB *before* the run thread is spawned —
prevents double-fire if a run is slow or pipekit restarts mid-run. A scheduled
fire and a manual fire use the same `run_group` code path; only `triggered_by`
differs (`schedule:{id}` vs `manual`).
**Ad-hoc runs:**
@ -332,8 +362,30 @@ Pipekit verifies jrunner exists on startup (configurable path in
drivers loadable, database accessible, all configured connections testable.
First thing to run after a `git pull`.
**Packaging.** Start loose-coupled (install jrunner separately, point Pipekit
at it). Bundle later if/when the two-step gets annoying.
**`./deploy.sh`** — idempotent full-system deploy. Run once from a fresh clone
or re-run after any code update. Does:
1. Creates the `pipekit` system user (`useradd --system`) if absent
2. Chowns `/opt/pipekit` to `pipekit:pipekit`
3. Creates/repairs the Python venv at `.venv` owned by `pipekit`
4. Installs deps as `pipekit` user (`pip install -r requirements.txt`)
5. Installs `/usr/local/bin/pipekit` launcher symlink
6. Creates `/etc/pipekit/secrets.env` (mode 0640, group pipekit) if absent
7. Runs `pipekit init` (schema creation + migrations) as `pipekit`
8. Upserts JDBC driver rows for each jar found in jrunner's lib dir
9. Installs and enables `systemd/pipekit.service`
**Secrets** live in `/etc/pipekit/secrets.env` as `KEY=VALUE` lines. The
`password` field on a connection row stores `$KEY`; the engine resolves it at
runtime from the environment. The systemd unit loads this file as
`EnvironmentFile=`. Mode 0640, group pipekit — the service account can read it
but it is not world-readable. Managed with `pipekit secrets set/list/unset`.
**jrunner is a prerequisite, not managed by deploy.sh.** `deploy.sh` checks
that `jrunner` is on PATH and aborts with a clear message if not. Installing
jrunner (`/opt/jrunner/deploy.sh`) is a separate step the operator does first.
Keeping them decoupled means jrunner can be updated independently. Bundle later
if/when the two-step gets annoying.
## New module wizard
@ -628,7 +680,7 @@ behavior.
| Hooks | Per-module, post-merge, run_on success/failure/always |
| Group hooks | Deferred — not needed yet |
| Group membership | Many-to-many (junction table for run_order anyway) |
| Group execution | Sequential, stop on failure |
| Group execution | Sequential, continue past failures; final status = worst of members |
| Schedules | Attach to groups; multiple schedules per group allowed |
| Locking | Atomic UPDATE on `module.running`; PID + time-based stale clearing |
| Credentials | Env var references (`$DB2PW`); resolved at runtime |
@ -642,6 +694,8 @@ behavior.
| Linked servers | Not first-class; only affect FROM-clause syntax at author time; not persisted on module |
| API style | REST, GET for reads, POST for writes, no PUT/DELETE |
| Run model | Async — POST /run returns run_id immediately; watch via polling or SSE stream |
| Live output | Server-Sent Events (SSE) — plain HTTP, curl-friendly, browser-native |
| Live output | HTMX polling every 23 s (SSE deferred — polling sufficient for current scale) |
| Auth | HTTP Basic, single user, creds in settings table |
| TUI ↔ backend | TUI is an HTTP client; never touches SQLite directly |
| TUI | Deferred — web UI built first; TUI remains valid future work as an API client |
| Scheduler | croniter for cron evaluation; last_fired_at persisted before run to prevent double-fire |
| Service account | Runs as pipekit system user; venv + db owned by that user; secrets 0640 group pipekit |

114
deploy.sh
View File

@ -1,17 +1,22 @@
#!/usr/bin/env bash
# Pipekit deployment — idempotent. Re-run any time.
# Pipekit deployment — idempotent. Re-run after any code update.
#
# Steps:
# 1. Check prerequisites (python3, jrunner on PATH)
# 2. Create Python venv at $REPO/.venv and install requirements
# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python)
# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body)
# 5. Run `pipekit init` to create/upgrade the SQLite schema
# 6. Register driver rows for every JDBC jar shipped with jrunner
# What it does:
# 1. Creates the 'pipekit' system user (if absent)
# 2. Chowns /opt/pipekit to pipekit:pipekit
# 3. Creates Python venv (as pipekit) and installs requirements
# 4. Installs /usr/local/bin/pipekit launcher
# 5. Creates /etc/pipekit/secrets.env (mode 0640, group pipekit)
# 6. Runs 'pipekit init' to create/upgrade the SQLite schema
# 7. Registers JDBC driver rows for every jar shipped with jrunner
# 8. Installs and enables the systemd unit (does not start it)
#
# After running:
# - Set DB passwords with: sudo pipekit secrets set <KEY>
# - See systemd/pipekit.service for a unit file template
# Usage:
# ./deploy.sh # re-execs itself with sudo if needed
#
# After deploy:
# sudo pipekit secrets set KEY VALUE # add connection passwords
# sudo systemctl start pipekit
set -euo pipefail
@ -20,7 +25,11 @@ VENV_DIR="$REPO_DIR/.venv"
LAUNCHER="/usr/local/bin/pipekit"
CONFIG_DIR="/etc/pipekit"
SECRETS_FILE="$CONFIG_DIR/secrets.env"
SERVICE_NAME="pipekit"
UNIT_SRC="$REPO_DIR/systemd/pipekit.service"
UNIT_DST="/etc/systemd/system/pipekit.service"
# Re-exec as root if needed
if [ "$EUID" -ne 0 ]; then
exec sudo -H -E "$0" "$@"
fi
@ -31,49 +40,73 @@ echo "venv: $VENV_DIR"
echo "secrets: $SECRETS_FILE"
echo ""
# ── Prerequisites ─────────────────────────────────────────────────────────────
command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; }
command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; }
# ── 1. System user ────────────────────────────────────────────────────────────
if ! id -u "$SERVICE_NAME" >/dev/null 2>&1; then
echo "Creating system user: $SERVICE_NAME"
useradd --system --no-create-home --shell /usr/sbin/nologin "$SERVICE_NAME"
else
echo "User $SERVICE_NAME already exists."
fi
# ── 2. Ownership ──────────────────────────────────────────────────────────────
echo "Setting ownership of $REPO_DIR to $SERVICE_NAME:$SERVICE_NAME"
chown -R "$SERVICE_NAME:$SERVICE_NAME" "$REPO_DIR"
# ── 3. Venv + deps ────────────────────────────────────────────────────────────
if [ -d "$VENV_DIR" ] && [ "$(stat -c '%U' "$VENV_DIR")" != "$SERVICE_NAME" ]; then
echo "Removing root-owned venv and recreating as $SERVICE_NAME"
rm -rf "$VENV_DIR"
fi
if [ ! -d "$VENV_DIR" ]; then
echo "Creating venv at $VENV_DIR"
python3 -m venv "$VENV_DIR"
sudo -u "$SERVICE_NAME" python3 -m venv "$VENV_DIR"
fi
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
echo "Python deps installed."
# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed.
echo "Installing Python dependencies"
sudo -u "$SERVICE_NAME" "$VENV_DIR/bin/pip" install --quiet --upgrade pip
sudo -u "$SERVICE_NAME" "$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
echo "Dependencies installed."
# ── 4. Launcher ───────────────────────────────────────────────────────────────
chmod +x "$REPO_DIR/bin/pipekit"
ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER"
echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit"
# ── 5. Secrets file ───────────────────────────────────────────────────────────
install -d -m 0755 "$CONFIG_DIR"
if [ ! -f "$SECRETS_FILE" ]; then
install -m 0600 /dev/null "$SECRETS_FILE"
install -m 0640 /dev/null "$SECRETS_FILE"
chown "root:$SERVICE_NAME" "$SECRETS_FILE"
cat > "$SECRETS_FILE" <<'EOF'
# pipekit secrets — sourced by the service process (EnvironmentFile=)
# or by the shell before `pipekit serve`. One KEY=VALUE per line.
# Connection rows reference these as $KEY (e.g. password: "$DB2PW").
#
# This file must stay mode 0600 and out of version control.
# Use `sudo pipekit secrets set <KEY>` to add entries safely.
# pipekit secrets — loaded by the systemd unit as EnvironmentFile.
# Connection passwords are stored as $KEY references in the DB.
# Add entries with: sudo pipekit secrets set KEY VALUE
EOF
chmod 0600 "$SECRETS_FILE"
echo "Created $SECRETS_FILE"
else
# Ensure correct permissions even if file pre-existed
chown "root:$SERVICE_NAME" "$SECRETS_FILE"
chmod 0640 "$SECRETS_FILE"
echo "Keeping existing $SECRETS_FILE"
fi
"$LAUNCHER" init
# ── 6. Schema init ────────────────────────────────────────────────────────────
sudo -u "$SERVICE_NAME" "$LAUNCHER" init
echo "Schema initialised."
# Register drivers for each JDBC jar jrunner ships with.
# ── 7. Driver registration ────────────────────────────────────────────────────
JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib"
register_jar() {
local kind="$1" pattern="$2"
local jar
jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)"
if [ -n "$jar" ]; then
"$LAUNCHER" drivers register "$kind" --jar "$jar"
sudo -u "$SERVICE_NAME" "$LAUNCHER" drivers register "$kind" --jar "$jar"
else
echo " (no $pattern in $JR_LIB — skipping $kind)"
fi
@ -82,16 +115,25 @@ register_jar db2 "jt400-*.jar"
register_jar pg "postgresql-*.jar"
register_jar mssql "mssql-jdbc-*.jar"
# ── 8. Systemd unit ───────────────────────────────────────────────────────────
if [ ! -f "$UNIT_SRC" ]; then
echo "WARNING: $UNIT_SRC not found — skipping systemd install"
else
cp "$UNIT_SRC" "$UNIT_DST"
systemctl daemon-reload
systemctl enable "$SERVICE_NAME"
echo "Systemd unit installed and enabled."
fi
echo ""
echo "pipekit deployed."
echo ""
echo "Next steps:"
echo " 1. Set passwords: sudo pipekit secrets set DB2PW"
echo " sudo pipekit secrets set PGPW"
echo " 2. Start the server manually:"
echo " set -a; source $SECRETS_FILE; set +a"
echo " pipekit serve"
echo " 3. Or install the systemd unit:"
echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/"
echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now pipekit"
echo " 1. Add connection passwords:"
echo " sudo pipekit secrets set DB2PW <value>"
echo " sudo pipekit secrets set PGPW <value>"
echo " 2. Start the service:"
echo " sudo systemctl start pipekit"
echo " 3. Check it:"
echo " sudo systemctl status pipekit"
echo " journalctl -u pipekit -f"

View File

@ -7,6 +7,8 @@ content-negotiation complexity and keeps the API curl-testable.
from __future__ import annotations
from contextlib import asynccontextmanager
from fastapi import FastAPI
from .. import __version__, db, jrunner
@ -14,8 +16,15 @@ from ..web import mount_web
from .routes import connections, introspect, modules, runs, system
@asynccontextmanager
async def _lifespan(app: FastAPI):
from ..scheduler import start_scheduler
start_scheduler()
yield
def create_app() -> FastAPI:
app = FastAPI(title="Pipekit", version=__version__)
app = FastAPI(title="Pipekit", version=__version__, lifespan=_lifespan)
app.include_router(system.router)
app.include_router(connections.router, prefix="/api")
app.include_router(introspect.router, prefix="/api")

View File

@ -78,12 +78,20 @@ def cmd_drivers_register(args) -> int:
f"(registering anyway)")
name = args.name or f"{args.kind}-jdbc"
row = repo.create_driver(
name=name, kind=args.kind, jar_file=args.jar,
class_name=class_name, url_template=args.url_template,
)
print(f"registered driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
existing = repo.get_driver_by_name(name)
if existing:
row = repo.update_driver(existing["id"], jar_file=args.jar,
class_name=class_name,
url_template=args.url_template)
print(f"updated driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
else:
row = repo.create_driver(
name=name, kind=args.kind, jar_file=args.jar,
class_name=class_name, url_template=args.url_template,
)
print(f"registered driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
return 0
@ -218,7 +226,14 @@ def cmd_secrets_set(args) -> int:
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(lines)
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600
# Preserve existing permissions/ownership if the file already exists;
# otherwise default to 0640 so the pipekit service user can read it.
if os.path.exists(path):
st = os.stat(path)
os.chmod(tmp, stat.S_IMODE(st.st_mode))
os.chown(tmp, st.st_uid, st.st_gid)
else:
os.chmod(tmp, 0o640)
os.replace(tmp, path)
print(f"{'updated' if replaced else 'added'} {args.key} in {path}")
return 0

View File

@ -42,6 +42,10 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
if "live_log" not in rl_cols:
conn.execute("ALTER TABLE run_log ADD COLUMN live_log TEXT")
sc_cols = {r[1] for r in conn.execute("PRAGMA table_info(schedule)")}
if "last_fired_at" not in sc_cols:
conn.execute("ALTER TABLE schedule ADD COLUMN last_fired_at TEXT")
@contextmanager
def connect(db_path: Path | None = None):

View File

@ -33,6 +33,27 @@ def create_driver(*, name: str, kind: str, jar_file: str, class_name: str,
return _row(c.execute("SELECT * FROM driver WHERE id=?", (cur.lastrowid,)).fetchone())
def get_driver_by_name(name: str) -> dict | None:
with db.connect() as c:
return _row(c.execute(
"SELECT * FROM driver WHERE name=?", (name,)).fetchone())
def update_driver(driver_id: int, *, jar_file: str | None = None,
class_name: str | None = None,
url_template: str | None = None) -> dict:
fields, vals = [], []
for col, val in (("jar_file", jar_file), ("class_name", class_name),
("url_template", url_template)):
if val is not None:
fields.append(f"{col}=?"); vals.append(val)
if fields:
vals.append(driver_id)
with db.connect() as c:
c.execute(f"UPDATE driver SET {', '.join(fields)} WHERE id=?", vals)
return get_driver_row(driver_id)
def list_drivers() -> list[dict]:
with db.connect() as c:
return [dict(r) for r in c.execute("SELECT * FROM driver ORDER BY name")]
@ -519,6 +540,68 @@ def delete_group(group_id: int) -> bool:
return cur.rowcount > 0
# ---------------------------------------------------------------------------
# Schedules
# ---------------------------------------------------------------------------
def create_schedule(*, group_id: int, cron_expr: str, enabled: int = 1) -> dict:
with db.connect() as c:
cur = c.execute(
"INSERT INTO schedule (group_id, cron_expr, enabled) VALUES (?, ?, ?)",
(group_id, cron_expr, enabled),
)
return _row(c.execute("SELECT * FROM schedule WHERE id=?", (cur.lastrowid,)).fetchone())
def get_schedule(schedule_id: int) -> dict | None:
with db.connect() as c:
return _row(c.execute("SELECT * FROM schedule WHERE id=?", (schedule_id,)).fetchone())
def list_schedules_for_group(group_id: int) -> list[dict]:
with db.connect() as c:
return [dict(r) for r in c.execute(
"SELECT * FROM schedule WHERE group_id=? ORDER BY id", (group_id,))]
def update_schedule(schedule_id: int, *, cron_expr: str | None = None,
enabled: int | None = None) -> dict | None:
fields, vals = [], []
for col, val in (("cron_expr", cron_expr), ("enabled", enabled)):
if val is not None:
fields.append(f"{col}=?"); vals.append(val)
if not fields:
return get_schedule(schedule_id)
vals.append(schedule_id)
with db.connect() as c:
c.execute(f"UPDATE schedule SET {', '.join(fields)} WHERE id=?", vals)
return get_schedule(schedule_id)
def delete_schedule(schedule_id: int) -> bool:
with db.connect() as c:
cur = c.execute("DELETE FROM schedule WHERE id=?", (schedule_id,))
return cur.rowcount > 0
def list_all_enabled_schedules() -> list[dict]:
with db.connect() as c:
return [dict(r) for r in c.execute(
"SELECT s.*, g.name AS group_name "
"FROM schedule s "
"JOIN grp g ON s.group_id=g.id "
"WHERE s.enabled=1"
)]
def mark_schedule_fired(schedule_id: int) -> None:
with db.connect() as c:
c.execute(
"UPDATE schedule SET last_fired_at=datetime('now') WHERE id=?",
(schedule_id,),
)
# ---------------------------------------------------------------------------
# Group members
# ---------------------------------------------------------------------------

95
pipekit/scheduler.py Normal file
View File

@ -0,0 +1,95 @@
"""Background scheduler: fires group runs when cron expressions are due.
One daemon thread wakes every 60 s, checks all enabled schedules, and
spawns a per-run thread for any that are due. ``last_fired_at`` is written
to the DB before the run starts so a slow or crashing run cannot double-fire
the same occurrence. Survives server restarts: missed ticks while pipekit was
down are detected on the next startup check (5 s after start).
"""
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime, timedelta, timezone
log = logging.getLogger(__name__)
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _check_and_fire() -> None:
from . import engine, repo
try:
schedules = repo.list_all_enabled_schedules()
except Exception as e: # noqa: BLE001
log.error("scheduler: failed to load schedules: %s", e)
return
now = _utcnow()
for sched in schedules:
try:
_maybe_fire(sched, now)
except Exception as e: # noqa: BLE001
log.error("scheduler: error on schedule id=%s: %s", sched["id"], e)
def _maybe_fire(sched: dict, now: datetime) -> None:
from croniter import croniter, CroniterBadCronError
from . import engine, repo
last_str = sched["last_fired_at"]
if last_str:
last_dt = datetime.fromisoformat(last_str).replace(tzinfo=timezone.utc)
else:
# Never fired — look back 2 minutes so we pick up the current tick
# without replaying indefinitely old missed ticks.
last_dt = now - timedelta(seconds=120)
try:
cron = croniter(sched["cron_expr"], last_dt)
next_dt = cron.get_next(datetime).replace(tzinfo=timezone.utc)
except CroniterBadCronError:
log.warning("scheduler: invalid cron_expr %r for schedule id=%s — skipping",
sched["cron_expr"], sched["id"])
return
if next_dt > now:
return
log.info("scheduler: firing group_id=%s (schedule id=%s expr=%r)",
sched["group_id"], sched["id"], sched["cron_expr"])
# Mark fired before spawning so a slow run can't double-fire.
repo.mark_schedule_fired(sched["id"])
group_id = sched["group_id"]
sched_id = sched["id"]
def _run() -> None:
try:
group_run_id = repo.create_group_run(
group_id, triggered_by=f"schedule:{sched_id}"
)
engine.run_group(group_id, group_run_id=group_run_id)
except Exception as exc: # noqa: BLE001
log.error("scheduler: run failed for group_id=%s: %s", group_id, exc)
threading.Thread(target=_run, daemon=True).start()
def start_scheduler() -> None:
"""Start the background scheduler daemon thread. Safe to call once at startup."""
def _loop() -> None:
time.sleep(5) # let the app finish initialising
while True:
_check_and_fire()
time.sleep(60)
t = threading.Thread(target=_loop, daemon=True, name="pipekit-scheduler")
t.start()
log.info("scheduler: started")

View File

@ -652,6 +652,23 @@ def _save_inline_watermarks(form, module_id: int) -> None:
default_value=default_val)
def _schedules_with_next(schedules: list[dict]) -> list[dict]:
"""Attach a 'next_fire_at' string to each schedule dict."""
from croniter import croniter, CroniterBadCronError
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
result = []
for s in schedules:
s = dict(s)
try:
cron = croniter(s["cron_expr"], now)
s["next_fire_at"] = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M UTC")
except CroniterBadCronError:
s["next_fire_at"] = "invalid expression"
result.append(s)
return result
def _sanitize_identifier(name: str) -> str:
"""Lower-case a source column name and replace characters that aren't
valid in an unquoted identifier with underscores."""
@ -1020,7 +1037,7 @@ def groups_index(request: Request):
def group_new(request: Request):
return _templates.TemplateResponse(
request, "group_form.html",
_ctx(group=None, all_modules=repo.list_modules(), members=[],
_ctx(group=None, all_modules=repo.list_modules(), members=[], schedules=[],
form_action="/groups", cancel_url="/groups"),
)
@ -1033,6 +1050,7 @@ async def group_create(request: Request):
raise HTTPException(409, f"group name {name!r} already exists")
grp = repo.create_group(name=name)
_save_group_members(form, grp["id"])
_save_group_schedules(form, grp["id"])
return RedirectResponse(url=f"/groups/{grp['id']}", status_code=303)
@ -1044,10 +1062,11 @@ def group_detail(request: Request, group_id: int):
members = repo.list_group_members(group_id)
recent_runs = repo.list_group_runs(group_id, limit=10)
group_running = bool(recent_runs and recent_runs[0]["status"] == "running")
schedules = _schedules_with_next(repo.list_schedules_for_group(group_id))
return _templates.TemplateResponse(
request, "group_detail.html",
_ctx(group=grp, members=members, recent_runs=recent_runs,
group_running=group_running),
group_running=group_running, schedules=schedules),
)
@ -1057,11 +1076,12 @@ def group_edit(request: Request, group_id: int):
if grp is None:
raise HTTPException(404, f"group id={group_id} not found")
members = repo.list_group_members(group_id)
schedules = repo.list_schedules_for_group(group_id)
return _templates.TemplateResponse(
request, "group_form.html",
_ctx(group=grp, all_modules=repo.list_modules(), members=members,
form_action=f"/groups/{group_id}", cancel_url=f"/groups/{group_id}",
section="groups"),
schedules=schedules,
form_action=f"/groups/{group_id}", cancel_url=f"/groups/{group_id}"),
)
@ -1076,6 +1096,7 @@ async def group_update(request: Request, group_id: int):
raise HTTPException(409, f"group name {name!r} already exists")
repo.update_group(group_id, name=name)
_save_group_members(form, group_id)
_save_group_schedules(form, group_id)
return RedirectResponse(url=f"/groups/{group_id}", status_code=303)
@ -1157,6 +1178,31 @@ def group_run_live_fragment(request: Request, group_run_id: int):
)
def _save_group_schedules(form, group_id: int) -> None:
"""Process sched_* parallel arrays from a group form POST."""
from croniter import croniter as _croniter
exprs = form.getlist("sched_cron_expr")
if not exprs:
return
ids = form.getlist("sched_id")
enableds = form.getlist("sched_enabled") # select always submits a value
for sid_str in form.getlist("sched_deleted_id"):
if sid_str:
repo.delete_schedule(int(sid_str))
for i, expr in enumerate(exprs):
expr = expr.strip()
if not expr:
continue
if not _croniter.is_valid(expr):
continue
enabled = 1 if (i < len(enableds) and enableds[i] == "1") else 0
sid_str = ids[i] if i < len(ids) else ""
if sid_str:
repo.update_schedule(int(sid_str), cron_expr=expr, enabled=enabled)
else:
repo.create_schedule(group_id=group_id, cron_expr=expr, enabled=enabled)
def _save_group_members(form, group_id: int) -> None:
module_ids = form.getlist("member_module_id")
run_orders = form.getlist("member_run_order")

View File

@ -56,6 +56,45 @@
</div>
</div>
<div class="panel">
<header>
Schedules
<span style="margin-left:auto"><a class="btn ghost" href="/groups/{{ group.id }}/edit">Edit schedules →</a></span>
</header>
<div class="body tight">
{% if schedules %}
<table class="grid">
<thead>
<tr>
<th>cron expression</th>
<th>next fire (UTC)</th>
<th>last fired</th>
<th>enabled</th>
</tr>
</thead>
<tbody>
{% for s in schedules %}
<tr>
<td class="mono">{{ s.cron_expr }}</td>
<td class="mono">{% if s.enabled %}{{ s.next_fire_at }}{% else %}—{% endif %}</td>
<td class="mono">{{ s.last_fired_at or "never" }}</td>
<td>
{% if s.enabled %}
<span class="pill success">yes</span>
{% else %}
<span class="pill disabled">no</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="empty">No schedules — <a href="/groups/{{ group.id }}/edit">add one →</a></div>
{% endif %}
</div>
</div>
{% include "_group_live.html" %}
<div class="panel" style="margin-top:1rem">

View File

@ -63,6 +63,52 @@
</div>
</div>
<div class="panel" style="margin-top:1rem">
<header>
Schedules
<span class="subtitle">cron expressions in UTC — e.g. <code>0 4 * * *</code> for daily at 04:00</span>
<button type="button" class="btn ghost" style="margin-left:auto"
onclick="addScheduleRow()">+ add</button>
</header>
<div class="body tight">
<table class="grid" id="sched-table">
<thead>
<tr>
<th>cron expression</th>
<th style="width:6rem">enabled</th>
<th style="width:5rem"></th>
</tr>
</thead>
<tbody id="sched-tbody">
{% for s in schedules %}
<tr>
<input type="hidden" name="sched_id" value="{{ s.id }}">
<td>
<input type="text" name="sched_cron_expr"
value="{{ s.cron_expr }}" class="mono"
style="width:100%" placeholder="0 * * * *">
</td>
<td>
<select name="sched_enabled" style="width:100%">
<option value="1" {% if s.enabled %}selected{% endif %}>yes</option>
<option value="0" {% if not s.enabled %}selected{% endif %}>no</option>
</select>
</td>
<td>
<button type="button" class="ghost"
style="color:var(--danger);border:none"
onclick="removeScheduleRow(this, '{{ s.id }}')">remove</button>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if not schedules %}
<div id="sched-empty" class="empty" style="margin-top:0.4rem">No schedules — group runs manually only.</div>
{% endif %}
</div>
</div>
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
<button type="submit" class="primary">{% if group %}save changes{% else %}create group{% endif %}</button>
@ -71,6 +117,27 @@
</div>
</div>
<template id="sched-row-template">
<tr>
<input type="hidden" name="sched_id" value="">
<td>
<input type="text" name="sched_cron_expr" value="" class="mono"
style="width:100%" placeholder="0 * * * *">
</td>
<td>
<select name="sched_enabled" style="width:100%">
<option value="1" selected>yes</option>
<option value="0">no</option>
</select>
</td>
<td>
<button type="button" class="ghost"
style="color:var(--danger);border:none"
onclick="removeScheduleRow(this, '')">remove</button>
</td>
</tr>
</template>
<template id="member-row-template">
<tr>
<td>
@ -92,6 +159,25 @@
</template>
<script>
function addScheduleRow() {
const tmpl = document.getElementById('sched-row-template');
const row = tmpl.content.cloneNode(true);
document.getElementById('sched-tbody').appendChild(row);
const empty = document.getElementById('sched-empty');
if (empty) empty.style.display = 'none';
}
function removeScheduleRow(btn, schedId) {
if (schedId) {
const input = document.createElement('input');
input.type = 'hidden';
input.name = 'sched_deleted_id';
input.value = schedId;
document.getElementById('group-form').appendChild(input);
}
btn.closest('tr').remove();
}
function addMemberRow() {
const tmpl = document.getElementById('member-row-template');
const row = tmpl.content.cloneNode(true);

View File

@ -4,3 +4,4 @@ python-multipart>=0.0.20
jinja2>=3.1
pyyaml>=6.0
httpx>=0.27
croniter>=2.0

View File

@ -1,15 +1,3 @@
# Pipekit systemd unit template.
#
# Install:
# sudo cp pipekit.service /etc/systemd/system/
# sudo systemctl daemon-reload
# sudo systemctl enable --now pipekit
#
# Runs as root by default. For a dedicated service account, create the
# user and uncomment User=/Group= below:
# sudo useradd --system --home-dir /opt/pipekit --shell /usr/sbin/nologin pipekit
# sudo chown -R pipekit:pipekit /opt/pipekit/pipekit.db /etc/pipekit
[Unit]
Description=Pipekit sync engine
After=network-online.target
@ -17,11 +5,11 @@ Wants=network-online.target
[Service]
Type=simple
# User=pipekit
# Group=pipekit
User=pipekit
Group=pipekit
WorkingDirectory=/opt/pipekit
EnvironmentFile=/etc/pipekit/secrets.env
ExecStart=/usr/local/bin/pipekit serve --host 0.0.0.0
ExecStart=/usr/local/bin/pipekit serve
Restart=on-failure
RestartSec=5s