pipekit/CLAUDE.md
Paul Trowbridge 595024eb52 Unify incremental sync config: inline watermarks + editable source query
Watermarks, merge strategy, merge key, and source query are now edited
together in one form on both the module edit page and wizard step 3.
A client-side placeholder warning fires when {name} tokens in the query
don't match the watermark rows on the page. The wizard now shows an
editable source query textarea pre-populated from column picks so WHERE
clauses can be added before module creation. Watermarks submitted via
wm_* arrays are processed by _save_inline_watermarks() in both
module_update and wizard_create.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:31:55 -04:00

6.2 KiB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Running the Server

pipekit serve                          # default host/port from config.yaml
pipekit serve --host 0.0.0.0 --port 8080 --reload   # dev mode with auto-reload

The user runs the server themselves in their own terminal — do not nohup or background it.

Other CLI Commands

pipekit init                           # create/upgrade SQLite schema
pipekit doctor                         # health check (config, jrunner, DB)
pipekit run <module_name>              # run a module synchronously (manual test)
pipekit set-password <username>        # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE]        # add/update a secret (prompts if value omitted)
pipekit drivers list                   # show registered driver kinds
./deploy.sh                            # idempotent: venv, deps, launcher, driver registration

Architecture

Pipekit is a database sync tool. A module defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.

Layers (bottom to top):

  1. SQLite (pipekit.db) — single file, all state
  2. repo.py — all CRUD for every table; ~1,900 LOC, the only layer that touches the DB
  3. engine/ — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
  4. jrunner — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
  5. api/ — FastAPI REST endpoints under /api/*, HTTP Basic Auth (except /health)
  6. web/ — HTML pages (Jinja2 templates) at /; HTMX + Alpine.js for interactivity

Key Data Model

  • driver — JDBC driver registration (kind, jar, class, url_template)
  • connection — named DB connection (jdbc_url, username, password as $ENV_VAR reference)
  • module — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
  • watermark — named placeholder with resolver SQL; first column of first row used as opaque string; replaces {watermark_name} in source query
  • hook — post-merge SQL (run_order, run_on: success/failure/always)
  • run_log — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)

Engine Flow

run_module(module_id)
  → atomic UPDATE module SET running=1 WHERE running=0  # fail if already locked
  → for each watermark: run resolver SQL via jrunner → capture first cell  # always live, even dry runs
  → materialize source query (simple string replace {name} → value)
  → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
  → if dry_run: write run_log (status='dry_run'), return early — no data movement
  → DROP + CREATE staging table (pipekit_staging.{module_name})  # self-healing schema drift
  → jrunner migrate source → staging  # uses Popen; each stdout line appended to run_log.live_log
  → run merge SQL via jrunner
  → run hooks in order
  → write run_log entry
  → UPDATE module SET running=0 (in finally)

Run Statuses

  • running — in progress
  • success — completed, data moved
  • dry_run — resolved watermarks + built SQL, no data movement
  • error — failed
  • cancelled — cancelled mid-run

Credentials Pattern

Passwords in connections are stored as $VAR_NAME references. At run time they are resolved from /etc/pipekit/secrets.env (override with PIPEKIT_SECRETS env var). Config path override: PIPEKIT_CONFIG.

Driver Abstraction

Each driver in pipekit/drivers/ inherits from base.py::Driver and implements: browse_fields, list_tables, list_schemas, get_columns, map_type, default_expression, quote_identifier. The wizard UI calls /api/introspect/* which dispatches to the appropriate driver.

Module Columns

Modules store their column mapping as columns_json — a JSON list of dicts with keys source_name, source_type, dest_name, dest_type. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.

Inline Watermark Editing

Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone /watermarks/{id}/edit page). The module edit form (module_form.html) renders existing watermarks as editable rows and submits them as parallel arrays (wm_id[], wm_name[], wm_connection_id[], wm_resolver_sql[], wm_default_value[], wm_deleted_id[]). The _save_inline_watermarks(form, module_id) helper in web/app.py processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both module_update and wizard_create call it after saving the module. A client-side placeholder warning checks that {name} tokens in the source query match the watermark names on the page.

Merge Key

merge_key is stored as a comma-separated string (e.g., "col1, col2"). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.

Staging Table

Recreated on every run as pipekit_staging.{module_name} (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.

API vs. Web

  • /api/* — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
  • / and other bare paths — full HTML pages (Jinja2), no auth currently
  • POST /modules/{id}/run returns {run_id} immediately (both API and web); run is async via BackgroundTasks
  • GET /runs/{id}/live — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status

Tech Stack

  • Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib)
  • python-multipart required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
  • Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
  • jrunner: separate Java tool, must be on PATH

Full Spec

/opt/pipekit/SPEC.md is the authoritative design document. Read it for deep rationale on any architectural decision.