# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Running the Server Pipekit runs as a systemd service (`pipekit.service`). Common commands: ```bash sudo systemctl status pipekit # check status sudo systemctl restart pipekit # restart after changes sudo systemctl stop pipekit # stop sudo journalctl -u pipekit -f # follow logs ``` For dev/testing outside the service — never nohup or background it: ```bash pipekit serve # default host/port from config.yaml pipekit serve --host 0.0.0.0 --port 8080 --reload # dev mode with auto-reload ``` ## Other CLI Commands ```bash pipekit init # create/upgrade SQLite schema pipekit doctor # health check (config, jrunner, DB) pipekit run # run a module synchronously (manual test) pipekit set-password # set HTTP Basic Auth credentials pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted) pipekit secrets list # show stored secret keys (no values) pipekit drivers list # show registered driver kinds ./deploy.sh # full idempotent deploy: user, venv, deps, launcher, # secrets.env, schema init, driver registration, systemd unit ``` ## Architecture Pipekit is a database sync tool. A **module** defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest. **Layers (bottom to top):** 1. **SQLite** (`pipekit.db`) — single file, all state 2. **`repo.py`** — all CRUD for every table; ~725 LOC, the only layer that touches the DB 3. **`engine/`** — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release 4. **jrunner** — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner 5. **`api/`** — FastAPI REST endpoints under `/api/*`, HTTP Basic Auth (except `/health`) 6. **`web/`** — HTML pages (Jinja2 templates) at `/`; HTMX + Alpine.js for interactivity ## Key Data Model - **driver** — JDBC driver registration (kind, jar, class, url_template) - **connection** — named DB connection (jdbc_url, username, password as `$ENV_VAR` reference) - **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock) - **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query - **hook** — post-merge SQL (run_order, run_on: success/failure/always) - **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run) - **grp** — named group of modules; modules belong to a group via `group_member` (with run_order) - **group_member** — join table: group → module with run_order; disabled modules are skipped by group runs - **group_run** — one execution of a group: status, timing, triggered_by (manual | schedule:{id}) - **schedule** — cron expression tied to a group; scheduler fires `run_group` when due; tracks `last_fired_at` to survive restarts without double-firing ## Engine Flow ``` run_module(module_id) → atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked → for each watermark: run resolver SQL via jrunner → capture first cell # always live, even dry runs → materialize source query (simple string replace {name} → value) → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert) → if dry_run: write run_log (status='dry_run'), return early — no data movement → DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift → jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log → run merge SQL via jrunner → run hooks in order → write run_log entry → UPDATE module SET running=0 (in finally) run_group(group_id) → create group_run row (status=running) → for each enabled group_member in run_order: call run_module(group_run_id=...) → continues past individual module failures (all members run) → final status: dry_run if all dry_run, error if any errored, success otherwise → finish_group_run(status) ``` ## Run Statuses - `running` — in progress - `success` — completed, data moved - `dry_run` — resolved watermarks + built SQL, no data movement - `error` — failed - `cancelled` — cancelled mid-run ## Credentials Pattern Passwords in connections are stored as `$VAR_NAME` references. At run time they are resolved from `/etc/pipekit/secrets.env` (override with `PIPEKIT_SECRETS` env var). Config path override: `PIPEKIT_CONFIG`. ## Driver Abstraction Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver. ## Wizard Entry Modes Step 1 offers two paths. **Browse a table** (`/wizard/tables` → `/wizard/columns`) is the original flow: pick one source table, introspect its columns. **Write SQL** (`/wizard/sql` → `POST /wizard/sql/columns`) starts from an arbitrary query — the source query is used verbatim and its result columns are discovered by `Driver.introspect_query_columns`, which runs the query wrapped to fetch ~no rows (`_zero_row_wrap`: derived-table + `WHERE 1=0` by default; DB2 appends `FETCH FIRST 1 ROW ONLY` since DB2 for i forbids `WITH` inside a nested table expression). Both paths land on `wizard_step3.html` (the SQL path passes `sql_mode=True`) and POST to `/wizard/create`, which branches on `entry_mode`. SQL-mode dest types default to `text` — there's no result-set type metadata over jrunner's CSV output, so the user adjusts types by hand. `columns_json` is never read at run time (the engine does `CREATE staging LIKE dest` + `SELECT *`); it only drives the dest `CREATE TABLE`. ## Module Columns Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type` (and a stable `id`). It drives the dest `CREATE TABLE`; the engine itself doesn't read it at run time (it does `CREATE staging LIKE dest` + `SELECT *`). **Post-creation column add.** The module detail page's Schema panel has a "+ add column" action (`/modules/{id}/columns/new` → `POST /modules/{id}/columns`) that appends a column to an existing module: it `ALTER TABLE … ADD COLUMN`s the dest (always at the tail — the only position ALTER supports, which keeps the positional load aligned), applies a column comment where supported, and appends to `columns_json`. Each column row carries a stable `id` (`c1`, `c2`, …) — the data-movement identity for future schema reconciliation. Add is the only mutation for now; reorder/retype/drop (which can require a table rebuild) are not implemented. Driver methods: `build_add_column_sql`, `column_inventory`. ## Inline Watermark Editing Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone `/watermarks/{id}/edit` page). The module edit form (`module_form.html`) renders existing watermarks as editable rows and submits them as parallel arrays (`wm_id[]`, `wm_name[]`, `wm_connection_id[]`, `wm_resolver_sql[]`, `wm_default_value[]`, `wm_deleted_id[]`). The `_save_inline_watermarks(form, module_id)` helper in `web/app.py` processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both `module_update` and `wizard_create` call it after saving the module. A client-side placeholder warning checks that `{name}` tokens in the source query match the watermark names on the page. ## Merge Key `merge_key` is stored as a comma-separated string (e.g., `"col1, col2"`). The engine parses it and generates a multi-column DELETE predicate for incremental strategy. ## Staging Table Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run. ## Bulk Copy (SQL Server dest) `jrunner.migrate` passes jrunner's `-b` flag when the dest JDBC URL starts with `jdbc:sqlserver:`, so SQL Server loads stream via `SQLServerBulkCopy` (TDS bulk-load) instead of batched INSERTs — dramatically faster on large/wide tables (a 1.27M-row load went ~111 min → ~4 min). DB2/PG dests keep the INSERT path. This is automatic per-dest; no module config. (Requires jrunner with `-b` support.) Note: jrunner only streams the **Postgres source** without buffering it all into memory because it sets `autoCommit(false)` on the source connection in migration mode — a PG-driver requirement for `setFetchSize` to take effect. ## Scheduler `pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation. Cron expressions are evaluated in **local server time** (not UTC) — `0 4 * * *` fires at 04:00 local. ## API vs. Web - `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers - `/` and other bare paths — full HTML pages (Jinja2), no auth currently - `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks - `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status - `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links ## Logging `api/app.py` configures a `pipekit` logger to stderr (→ journal) and registers a `StarletteHTTPException` handler that logs 5xx at ERROR (with traceback) and 4xx at WARNING — because FastAPI otherwise turns `HTTPException` into a response and logs nothing (failures like wizard dest-provisioning errors were invisible). A small ASGI middleware (`_RequestBodyCapture`) buffers each request body and replays it downstream so the handler can log the submitted payload on failure, with secret-looking fields (password/pwd/secret/token) masked. Covers *raised* HTTPExceptions; handlers that *return* an error response aren't body-logged. ## Live-log progress collapse jrunner prints an in-place progress counter (a bare number) per batch; read with universal newlines each tick is its own line, so a big load would stack thousands of numbers in `live_log`. `repo.append_run_live_log` collapses them: a bare-number tick overwrites a trailing bare-number line instead of appending, keeping a single current count. Real lines (headers, "N rows written", timestamps) are preserved. (Bulk copy emits a throttled counter every 10k rows, displayed the same way.) ## Tech Stack - Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter - `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep) - Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs - jrunner: separate Java tool, must be on PATH; `JAVA_HOME` must also be set — `deploy.sh` detects and injects it into the systemd unit automatically - Runs as the `pipekit` system user; venv at `/opt/pipekit/.venv` owned by that user; secrets at `/etc/pipekit/secrets.env` (mode 0640, group pipekit) ## Full Spec `/opt/pipekit/SPEC.md` is the authoritative design document. Read it for deep rationale on any architectural decision.