pipekit/CLAUDE.md
Paul Trowbridge 0ddb636f14 jrunner: use bulk copy (-b) for Postgres dests too (COPY)
Extend the -b wiring to jdbc:postgresql: dests, so DB2->PG (and any PG-dest)
loads use jrunner's COPY FROM STDIN path instead of batched INSERTs. SQL
Server already used -b (SQLServerBulkCopy); DB2 dests stay on INSERT. Update
the CLAUDE.md bulk section accordingly.

Validated DB2->PG COPY with real types (dates -> date col, decimals ->
numeric, char) and null/empty-string fidelity.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 23:14:15 -04:00

12 KiB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Running the Server

Pipekit runs as a systemd service (pipekit.service). Common commands:

sudo systemctl status pipekit          # check status
sudo systemctl restart pipekit         # restart after changes
sudo systemctl stop pipekit            # stop
sudo journalctl -u pipekit -f          # follow logs

For dev/testing outside the service — never nohup or background it:

pipekit serve                          # default host/port from config.yaml
pipekit serve --host 0.0.0.0 --port 8080 --reload   # dev mode with auto-reload

Other CLI Commands

pipekit init                           # create/upgrade SQLite schema
pipekit doctor                         # health check (config, jrunner, DB)
pipekit run <module_name>              # run a module synchronously (manual test)
pipekit set-password <username>        # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE]        # add/update a secret (prompts if value omitted)
pipekit secrets list                   # show stored secret keys (no values)
pipekit drivers list                   # show registered driver kinds
./deploy.sh                            # full idempotent deploy: user, venv, deps, launcher,
                                       # secrets.env, schema init, driver registration, systemd unit

Architecture

Pipekit is a database sync tool. A module defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.

Layers (bottom to top):

  1. SQLite (pipekit.db) — single file, all state
  2. repo.py — all CRUD for every table; ~725 LOC, the only layer that touches the DB
  3. engine/ — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
  4. jrunner — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
  5. api/ — FastAPI REST endpoints under /api/*, HTTP Basic Auth (except /health)
  6. web/ — HTML pages (Jinja2 templates) at /; HTMX + Alpine.js for interactivity

Key Data Model

  • driver — JDBC driver registration (kind, jar, class, url_template)
  • connection — named DB connection (jdbc_url, username, password as $ENV_VAR reference)
  • module — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
  • watermark — named placeholder with resolver SQL; first column of first row used as opaque string; replaces {watermark_name} in source query
  • hook — post-merge SQL (run_order, run_on: success/failure/always)
  • run_log — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
  • grp — named group of modules; modules belong to a group via group_member (with run_order)
  • group_member — join table: group → module with run_order; disabled modules are skipped by group runs
  • group_run — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
  • schedule — cron expression tied to a group; scheduler fires run_group when due; tracks last_fired_at to survive restarts without double-firing

Engine Flow

run_module(module_id)
  → atomic UPDATE module SET running=1 WHERE running=0  # fail if already locked
  → for each watermark: run resolver SQL via jrunner → capture first cell  # always live, even dry runs
  → materialize source query (simple string replace {name} → value)
  → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
  → if dry_run: write run_log (status='dry_run'), return early — no data movement
  → DROP + CREATE staging table (pipekit_staging.{module_name})  # self-healing schema drift
  → jrunner migrate source → staging  # uses Popen; each stdout line appended to run_log.live_log
  → run merge SQL via jrunner
  → run hooks in order
  → write run_log entry
  → UPDATE module SET running=0 (in finally)

run_group(group_id)
  → create group_run row (status=running)
  → for each enabled group_member in run_order: call run_module(group_run_id=...)
  → continues past individual module failures (all members run)
  → final status: dry_run if all dry_run, error if any errored, success otherwise
  → finish_group_run(status)

Run Statuses

  • running — in progress
  • success — completed, data moved
  • dry_run — resolved watermarks + built SQL, no data movement
  • error — failed
  • cancelled — cancelled mid-run

Credentials Pattern

Passwords in connections are stored as $VAR_NAME references. At run time they are resolved from /etc/pipekit/secrets.env (override with PIPEKIT_SECRETS env var). Config path override: PIPEKIT_CONFIG.

Driver Abstraction

Each driver in pipekit/drivers/ inherits from base.py::Driver and implements: browse_fields, list_tables, list_schemas, get_columns, map_type, default_expression, quote_identifier. The wizard UI calls /api/introspect/* which dispatches to the appropriate driver.

Wizard Entry Modes

Step 1 offers two paths. Browse a table (/wizard/tables/wizard/columns) is the original flow: pick one source table, introspect its columns. Write SQL (/wizard/sqlPOST /wizard/sql/columns) starts from an arbitrary query — the source query is used verbatim and its result columns are discovered by Driver.introspect_query_columns, which runs the query wrapped to fetch ~no rows (_zero_row_wrap: derived-table + WHERE 1=0 by default; DB2 appends FETCH FIRST 1 ROW ONLY since DB2 for i forbids WITH inside a nested table expression). Both paths land on wizard_step3.html (the SQL path passes sql_mode=True) and POST to /wizard/create, which branches on entry_mode. SQL-mode dest types default to text — there's no result-set type metadata over jrunner's CSV output, so the user adjusts types by hand. columns_json is never read at run time (the engine does CREATE staging LIKE dest + SELECT *); it only drives the dest CREATE TABLE.

Module Columns

Modules store their column mapping as columns_json — a JSON list of dicts with keys source_name, source_type, dest_name, dest_type (and a stable id). It drives the dest CREATE TABLE; the engine itself doesn't read it at run time (it does CREATE staging LIKE dest + SELECT *).

Post-creation column add. The module detail page's Schema panel has a "+ add column" action (/modules/{id}/columns/newPOST /modules/{id}/columns) that appends a column to an existing module: it ALTER TABLE … ADD COLUMNs the dest (always at the tail — the only position ALTER supports, which keeps the positional load aligned), applies a column comment where supported, and appends to columns_json. Each column row carries a stable id (c1, c2, …) — the data-movement identity for future schema reconciliation. Add is the only mutation for now; reorder/retype/drop (which can require a table rebuild) are not implemented. Driver methods: build_add_column_sql, column_inventory.

Inline Watermark Editing

Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone /watermarks/{id}/edit page). The module edit form (module_form.html) renders existing watermarks as editable rows and submits them as parallel arrays (wm_id[], wm_name[], wm_connection_id[], wm_resolver_sql[], wm_default_value[], wm_deleted_id[]). The _save_inline_watermarks(form, module_id) helper in web/app.py processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both module_update and wizard_create call it after saving the module. A client-side placeholder warning checks that {name} tokens in the source query match the watermark names on the page.

Merge Key

merge_key is stored as a comma-separated string (e.g., "col1, col2"). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.

Staging Table

Recreated on every run as pipekit_staging.{module_name} (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.

Bulk Copy

jrunner.migrate passes jrunner's -b flag when the dest is SQL Server (jdbc:sqlserver:) or Postgres (jdbc:postgresql:), so loads use the dest's native bulk path instead of batched INSERT…VALUESSQL Server via SQLServerBulkCopy (TDS bulk-load), Postgres via COPY … FROM STDIN. Dramatically faster on large/wide tables (a 1.27M-row SQL Server load went ~111 min → ~4 min). DB2 dests keep the INSERT path. Automatic per-dest; no module config. (Requires jrunner with -b support.) Note: jrunner only streams the Postgres source without buffering it all into memory because it sets autoCommit(false) on the source connection in migration mode — a PG-driver requirement for setFetchSize to take effect.

Scheduler

pipekit/scheduler.py runs a single daemon thread (started via FastAPI lifespan in api/app.py). It wakes every 60 s, reads all enabled schedules, and fires run_group for any whose next cron occurrence has passed since last_fired_at. last_fired_at is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses croniter for cron expression evaluation. Cron expressions are evaluated in local server time (not UTC) — 0 4 * * * fires at 04:00 local.

API vs. Web

  • /api/* — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
  • / and other bare paths — full HTML pages (Jinja2), no auth currently
  • POST /modules/{id}/run returns {run_id} immediately (both API and web); run is async via BackgroundTasks
  • GET /runs/{id}/live — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
  • /groups/* — group CRUD, run, live fragment; /group-runs/{id} — group run detail with per-module run links

Logging

api/app.py configures a pipekit logger to stderr (→ journal) and registers a StarletteHTTPException handler that logs 5xx at ERROR (with traceback) and 4xx at WARNING — because FastAPI otherwise turns HTTPException into a response and logs nothing (failures like wizard dest-provisioning errors were invisible). A small ASGI middleware (_RequestBodyCapture) buffers each request body and replays it downstream so the handler can log the submitted payload on failure, with secret-looking fields (password/pwd/secret/token) masked. Covers raised HTTPExceptions; handlers that return an error response aren't body-logged.

Live-log progress collapse

jrunner prints an in-place progress counter (a bare number) per batch; read with universal newlines each tick is its own line, so a big load would stack thousands of numbers in live_log. repo.append_run_live_log collapses them: a bare-number tick overwrites a trailing bare-number line instead of appending, keeping a single current count. Real lines (headers, "N rows written", timestamps) are preserved. (Bulk copy emits a throttled counter every 10k rows, displayed the same way.)

Tech Stack

  • Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
  • python-multipart required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
  • Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
  • jrunner: separate Java tool, must be on PATH; JAVA_HOME must also be set — deploy.sh detects and injects it into the systemd unit automatically
  • Runs as the pipekit system user; venv at /opt/pipekit/.venv owned by that user; secrets at /etc/pipekit/secrets.env (mode 0640, group pipekit)

Full Spec

/opt/pipekit/SPEC.md is the authoritative design document. Read it for deep rationale on any architectural decision.