8.4 KiB
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Running the Server
Pipekit runs as a systemd service (pipekit.service). Common commands:
sudo systemctl status pipekit # check status
sudo systemctl restart pipekit # restart after changes
sudo systemctl stop pipekit # stop
sudo journalctl -u pipekit -f # follow logs
For dev/testing outside the service — never nohup or background it:
pipekit serve # default host/port from config.yaml
pipekit serve --host 0.0.0.0 --port 8080 --reload # dev mode with auto-reload
Other CLI Commands
pipekit init # create/upgrade SQLite schema
pipekit doctor # health check (config, jrunner, DB)
pipekit run <module_name> # run a module synchronously (manual test)
pipekit set-password <username> # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted)
pipekit secrets list # show stored secret keys (no values)
pipekit drivers list # show registered driver kinds
./deploy.sh # full idempotent deploy: user, venv, deps, launcher,
# secrets.env, schema init, driver registration, systemd unit
Architecture
Pipekit is a database sync tool. A module defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.
Layers (bottom to top):
- SQLite (
pipekit.db) — single file, all state repo.py— all CRUD for every table; ~725 LOC, the only layer that touches the DBengine/— orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release- jrunner — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
api/— FastAPI REST endpoints under/api/*, HTTP Basic Auth (except/health)web/— HTML pages (Jinja2 templates) at/; HTMX + Alpine.js for interactivity
Key Data Model
- driver — JDBC driver registration (kind, jar, class, url_template)
- connection — named DB connection (jdbc_url, username, password as
$ENV_VARreference) - module — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
- watermark — named placeholder with resolver SQL; first column of first row used as opaque string; replaces
{watermark_name}in source query - hook — post-merge SQL (run_order, run_on: success/failure/always)
- run_log — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
- grp — named group of modules; modules belong to a group via
group_member(with run_order) - group_member — join table: group → module with run_order; disabled modules are skipped by group runs
- group_run — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
- schedule — cron expression tied to a group; scheduler fires
run_groupwhen due; trackslast_fired_atto survive restarts without double-firing
Engine Flow
run_module(module_id)
→ atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked
→ for each watermark: run resolver SQL via jrunner → capture first cell # always live, even dry runs
→ materialize source query (simple string replace {name} → value)
→ build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
→ if dry_run: write run_log (status='dry_run'), return early — no data movement
→ DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift
→ jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log
→ run merge SQL via jrunner
→ run hooks in order
→ write run_log entry
→ UPDATE module SET running=0 (in finally)
run_group(group_id)
→ create group_run row (status=running)
→ for each enabled group_member in run_order: call run_module(group_run_id=...)
→ continues past individual module failures (all members run)
→ final status: dry_run if all dry_run, error if any errored, success otherwise
→ finish_group_run(status)
Run Statuses
running— in progresssuccess— completed, data moveddry_run— resolved watermarks + built SQL, no data movementerror— failedcancelled— cancelled mid-run
Credentials Pattern
Passwords in connections are stored as $VAR_NAME references. At run time they are resolved from /etc/pipekit/secrets.env (override with PIPEKIT_SECRETS env var). Config path override: PIPEKIT_CONFIG.
Driver Abstraction
Each driver in pipekit/drivers/ inherits from base.py::Driver and implements: browse_fields, list_tables, list_schemas, get_columns, map_type, default_expression, quote_identifier. The wizard UI calls /api/introspect/* which dispatches to the appropriate driver.
Module Columns
Modules store their column mapping as columns_json — a JSON list of dicts with keys source_name, source_type, dest_name, dest_type. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.
Inline Watermark Editing
Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone /watermarks/{id}/edit page). The module edit form (module_form.html) renders existing watermarks as editable rows and submits them as parallel arrays (wm_id[], wm_name[], wm_connection_id[], wm_resolver_sql[], wm_default_value[], wm_deleted_id[]). The _save_inline_watermarks(form, module_id) helper in web/app.py processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both module_update and wizard_create call it after saving the module. A client-side placeholder warning checks that {name} tokens in the source query match the watermark names on the page.
Merge Key
merge_key is stored as a comma-separated string (e.g., "col1, col2"). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.
Staging Table
Recreated on every run as pipekit_staging.{module_name} (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.
Scheduler
pipekit/scheduler.py runs a single daemon thread (started via FastAPI lifespan in api/app.py). It wakes every 60 s, reads all enabled schedules, and fires run_group for any whose next cron occurrence has passed since last_fired_at. last_fired_at is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses croniter for cron expression evaluation. Cron expressions are evaluated in local server time (not UTC) — 0 4 * * * fires at 04:00 local.
API vs. Web
/api/*— JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers/and other bare paths — full HTML pages (Jinja2), no auth currentlyPOST /modules/{id}/runreturns{run_id}immediately (both API and web); run is async via BackgroundTasksGET /runs/{id}/live— HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status/groups/*— group CRUD, run, live fragment;/group-runs/{id}— group run detail with per-module run links
Tech Stack
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
python-multipartrequired for HTML form POSTs (not auto-installed as a FastAPI transitive dep)- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
- jrunner: separate Java tool, must be on PATH;
JAVA_HOMEmust also be set —deploy.shdetects and injects it into the systemd unit automatically - Runs as the
pipekitsystem user; venv at/opt/pipekit/.venvowned by that user; secrets at/etc/pipekit/secrets.env(mode 0640, group pipekit)
Full Spec
/opt/pipekit/SPEC.md is the authoritative design document. Read it for deep rationale on any architectural decision.