pipekit/CLAUDE.md
Paul Trowbridge a66488d1f2 Convert all timestamps to local time for display and scheduling
Scheduler now evaluates cron expressions against local time instead of
UTC, so schedules fire at the user's local clock time. All timestamp
displays in templates use a new `localtime` Jinja filter that converts
UTC strings from SQLite to the server's local timezone. Updated CLAUDE.md
to reflect the systemd service setup.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-06-03 23:21:05 -04:00

142 lines
8.2 KiB
Markdown

# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Running the Server
Pipekit runs as a systemd service (`pipekit.service`). Common commands:
```bash
sudo systemctl status pipekit # check status
sudo systemctl restart pipekit # restart after changes
sudo systemctl stop pipekit # stop
sudo journalctl -u pipekit -f # follow logs
```
For dev/testing outside the service — never nohup or background it:
```bash
pipekit serve # default host/port from config.yaml
pipekit serve --host 0.0.0.0 --port 8080 --reload # dev mode with auto-reload
```
## Other CLI Commands
```bash
pipekit init # create/upgrade SQLite schema
pipekit doctor # health check (config, jrunner, DB)
pipekit run <module_name> # run a module synchronously (manual test)
pipekit set-password <username> # set HTTP Basic Auth credentials
pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted)
pipekit secrets list # show stored secret keys (no values)
pipekit drivers list # show registered driver kinds
./deploy.sh # full idempotent deploy: user, venv, deps, launcher,
# secrets.env, schema init, driver registration, systemd unit
```
## Architecture
Pipekit is a database sync tool. A **module** defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.
**Layers (bottom to top):**
1. **SQLite** (`pipekit.db`) — single file, all state
2. **`repo.py`** — all CRUD for every table; ~725 LOC, the only layer that touches the DB
3. **`engine/`** — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
4. **jrunner** — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
5. **`api/`** — FastAPI REST endpoints under `/api/*`, HTTP Basic Auth (except `/health`)
6. **`web/`** — HTML pages (Jinja2 templates) at `/`; HTMX + Alpine.js for interactivity
## Key Data Model
- **driver** — JDBC driver registration (kind, jar, class, url_template)
- **connection** — named DB connection (jdbc_url, username, password as `$ENV_VAR` reference)
- **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
- **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query
- **hook** — post-merge SQL (run_order, run_on: success/failure/always)
- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
- **grp** — named group of modules; modules belong to a group via `group_member` (with run_order)
- **group_member** — join table: group → module with run_order; disabled modules are skipped by group runs
- **group_run** — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
- **schedule** — cron expression tied to a group; scheduler fires `run_group` when due; tracks `last_fired_at` to survive restarts without double-firing
## Engine Flow
```
run_module(module_id)
→ atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked
→ for each watermark: run resolver SQL via jrunner → capture first cell # always live, even dry runs
→ materialize source query (simple string replace {name} → value)
→ build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
→ if dry_run: write run_log (status='dry_run'), return early — no data movement
→ DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift
→ jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log
→ run merge SQL via jrunner
→ run hooks in order
→ write run_log entry
→ UPDATE module SET running=0 (in finally)
run_group(group_id)
→ create group_run row (status=running)
→ for each enabled group_member in run_order: call run_module(group_run_id=...)
→ continues past individual module failures (all members run)
→ final status: dry_run if all dry_run, error if any errored, success otherwise
→ finish_group_run(status)
```
## Run Statuses
- `running` — in progress
- `success` — completed, data moved
- `dry_run` — resolved watermarks + built SQL, no data movement
- `error` — failed
- `cancelled` — cancelled mid-run
## Credentials Pattern
Passwords in connections are stored as `$VAR_NAME` references. At run time they are resolved from `/etc/pipekit/secrets.env` (override with `PIPEKIT_SECRETS` env var). Config path override: `PIPEKIT_CONFIG`.
## Driver Abstraction
Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver.
## Module Columns
Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.
## Inline Watermark Editing
Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone `/watermarks/{id}/edit` page). The module edit form (`module_form.html`) renders existing watermarks as editable rows and submits them as parallel arrays (`wm_id[]`, `wm_name[]`, `wm_connection_id[]`, `wm_resolver_sql[]`, `wm_default_value[]`, `wm_deleted_id[]`). The `_save_inline_watermarks(form, module_id)` helper in `web/app.py` processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both `module_update` and `wizard_create` call it after saving the module. A client-side placeholder warning checks that `{name}` tokens in the source query match the watermark names on the page.
## Merge Key
`merge_key` is stored as a comma-separated string (e.g., `"col1, col2"`). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.
## Staging Table
Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.
## Scheduler
`pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation.
## API vs. Web
- `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
- `/` and other bare paths — full HTML pages (Jinja2), no auth currently
- `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks
- `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
- `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links
## Tech Stack
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
- `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
- jrunner: separate Java tool, must be on PATH
- Runs as the `pipekit` system user; venv at `/opt/pipekit/.venv` owned by that user; secrets at `/etc/pipekit/secrets.env` (mode 0640, group pipekit)
## Full Spec
`/opt/pipekit/SPEC.md` is the authoritative design document. Read it for deep rationale on any architectural decision.