pipekit/CLAUDE.md
Paul Trowbridge ba48b2ca2b CLAUDE.md: document local time scheduling and JAVA_HOME deploy behavior
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-06-03 23:51:12 -04:00

8.4 KiB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Running the Server

Pipekit runs as a systemd service (pipekit.service). Common commands:

sudo systemctl status pipekit          # check status
sudo systemctl restart pipekit         # restart after changes
sudo systemctl stop pipekit            # stop
sudo journalctl -u pipekit -f          # follow logs

For dev/testing outside the service — never nohup or background it:

pipekit serve                          # default host/port from config.yaml
pipekit serve --host 0.0.0.0 --port 8080 --reload   # dev mode with auto-reload

Other CLI Commands

pipekit init                           # create/upgrade SQLite schema
pipekit doctor                         # health check (config, jrunner, DB)
pipekit run <module_name>              # run a module synchronously (manual test)
pipekit set-password <username>        # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE]        # add/update a secret (prompts if value omitted)
pipekit secrets list                   # show stored secret keys (no values)
pipekit drivers list                   # show registered driver kinds
./deploy.sh                            # full idempotent deploy: user, venv, deps, launcher,
                                       # secrets.env, schema init, driver registration, systemd unit

Architecture

Pipekit is a database sync tool. A module defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.

Layers (bottom to top):

  1. SQLite (pipekit.db) — single file, all state
  2. repo.py — all CRUD for every table; ~725 LOC, the only layer that touches the DB
  3. engine/ — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
  4. jrunner — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
  5. api/ — FastAPI REST endpoints under /api/*, HTTP Basic Auth (except /health)
  6. web/ — HTML pages (Jinja2 templates) at /; HTMX + Alpine.js for interactivity

Key Data Model

  • driver — JDBC driver registration (kind, jar, class, url_template)
  • connection — named DB connection (jdbc_url, username, password as $ENV_VAR reference)
  • module — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
  • watermark — named placeholder with resolver SQL; first column of first row used as opaque string; replaces {watermark_name} in source query
  • hook — post-merge SQL (run_order, run_on: success/failure/always)
  • run_log — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
  • grp — named group of modules; modules belong to a group via group_member (with run_order)
  • group_member — join table: group → module with run_order; disabled modules are skipped by group runs
  • group_run — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
  • schedule — cron expression tied to a group; scheduler fires run_group when due; tracks last_fired_at to survive restarts without double-firing

Engine Flow

run_module(module_id)
  → atomic UPDATE module SET running=1 WHERE running=0  # fail if already locked
  → for each watermark: run resolver SQL via jrunner → capture first cell  # always live, even dry runs
  → materialize source query (simple string replace {name} → value)
  → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
  → if dry_run: write run_log (status='dry_run'), return early — no data movement
  → DROP + CREATE staging table (pipekit_staging.{module_name})  # self-healing schema drift
  → jrunner migrate source → staging  # uses Popen; each stdout line appended to run_log.live_log
  → run merge SQL via jrunner
  → run hooks in order
  → write run_log entry
  → UPDATE module SET running=0 (in finally)

run_group(group_id)
  → create group_run row (status=running)
  → for each enabled group_member in run_order: call run_module(group_run_id=...)
  → continues past individual module failures (all members run)
  → final status: dry_run if all dry_run, error if any errored, success otherwise
  → finish_group_run(status)

Run Statuses

  • running — in progress
  • success — completed, data moved
  • dry_run — resolved watermarks + built SQL, no data movement
  • error — failed
  • cancelled — cancelled mid-run

Credentials Pattern

Passwords in connections are stored as $VAR_NAME references. At run time they are resolved from /etc/pipekit/secrets.env (override with PIPEKIT_SECRETS env var). Config path override: PIPEKIT_CONFIG.

Driver Abstraction

Each driver in pipekit/drivers/ inherits from base.py::Driver and implements: browse_fields, list_tables, list_schemas, get_columns, map_type, default_expression, quote_identifier. The wizard UI calls /api/introspect/* which dispatches to the appropriate driver.

Module Columns

Modules store their column mapping as columns_json — a JSON list of dicts with keys source_name, source_type, dest_name, dest_type. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.

Inline Watermark Editing

Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone /watermarks/{id}/edit page). The module edit form (module_form.html) renders existing watermarks as editable rows and submits them as parallel arrays (wm_id[], wm_name[], wm_connection_id[], wm_resolver_sql[], wm_default_value[], wm_deleted_id[]). The _save_inline_watermarks(form, module_id) helper in web/app.py processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both module_update and wizard_create call it after saving the module. A client-side placeholder warning checks that {name} tokens in the source query match the watermark names on the page.

Merge Key

merge_key is stored as a comma-separated string (e.g., "col1, col2"). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.

Staging Table

Recreated on every run as pipekit_staging.{module_name} (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.

Scheduler

pipekit/scheduler.py runs a single daemon thread (started via FastAPI lifespan in api/app.py). It wakes every 60 s, reads all enabled schedules, and fires run_group for any whose next cron occurrence has passed since last_fired_at. last_fired_at is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses croniter for cron expression evaluation. Cron expressions are evaluated in local server time (not UTC) — 0 4 * * * fires at 04:00 local.

API vs. Web

  • /api/* — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
  • / and other bare paths — full HTML pages (Jinja2), no auth currently
  • POST /modules/{id}/run returns {run_id} immediately (both API and web); run is async via BackgroundTasks
  • GET /runs/{id}/live — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
  • /groups/* — group CRUD, run, live fragment; /group-runs/{id} — group run detail with per-module run links

Tech Stack

  • Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
  • python-multipart required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
  • Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
  • jrunner: separate Java tool, must be on PATH; JAVA_HOME must also be set — deploy.sh detects and injects it into the systemd unit automatically
  • Runs as the pipekit system user; venv at /opt/pipekit/.venv owned by that user; secrets at /etc/pipekit/secrets.env (mode 0640, group pipekit)

Full Spec

/opt/pipekit/SPEC.md is the authoritative design document. Read it for deep rationale on any architectural decision.