Scheduling: cron-based group runs via a daemon thread (scheduler.py) started at API startup. Schedules managed inline on the group edit form. last_fired_at persisted before run to prevent double-fire on restart. Requires croniter (added to requirements.txt); DB migration adds last_fired_at column to schedule table. Deploy: deploy.sh now creates the pipekit system user, chowns the repo, builds the venv as pipekit, and installs/enables the systemd unit. systemd/pipekit.service is now a production-ready unit (User=pipekit uncommented). pipekit secrets set preserves existing file permissions instead of resetting to 0600. Driver registration is now idempotent (upsert via get_driver_by_name + update_driver). Docs: CLAUDE.md and SPEC.md updated to reflect groups, scheduling, scheduler-in-API-process architecture, TUI deferred (not dropped), stop-on-failure tradeoff, jrunner as prerequisite, and deploy flow. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
133 lines
7.9 KiB
Markdown
133 lines
7.9 KiB
Markdown
# CLAUDE.md
|
|
|
|
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
|
|
|
## Running the Server
|
|
|
|
```bash
|
|
pipekit serve # default host/port from config.yaml
|
|
pipekit serve --host 0.0.0.0 --port 8080 --reload # dev mode with auto-reload
|
|
```
|
|
|
|
The user runs the server themselves in their own terminal — do not nohup or background it.
|
|
|
|
## Other CLI Commands
|
|
|
|
```bash
|
|
pipekit init # create/upgrade SQLite schema
|
|
pipekit doctor # health check (config, jrunner, DB)
|
|
pipekit run <module_name> # run a module synchronously (manual test)
|
|
pipekit set-password <username> # set HTTP Basic Auth credentials
|
|
pipekit secrets set KEY [VALUE] # add/update a secret (prompts if value omitted)
|
|
pipekit secrets list # show stored secret keys (no values)
|
|
pipekit drivers list # show registered driver kinds
|
|
./deploy.sh # full idempotent deploy: user, venv, deps, launcher,
|
|
# secrets.env, schema init, driver registration, systemd unit
|
|
```
|
|
|
|
## Architecture
|
|
|
|
Pipekit is a database sync tool. A **module** defines a source query → dest table sync job. The engine runs a module by: resolving watermarks → materializing source SQL → staging data via jrunner → merging into dest.
|
|
|
|
**Layers (bottom to top):**
|
|
|
|
1. **SQLite** (`pipekit.db`) — single file, all state
|
|
2. **`repo.py`** — all CRUD for every table; ~725 LOC, the only layer that touches the DB
|
|
3. **`engine/`** — orchestrates a module run: lock acquisition, watermark resolution, jrunner calls, merge SQL, post-run hooks, run_log write, lock release
|
|
4. **jrunner** — external Java CLI; handles all JDBC access. Python never talks to remote DBs directly; it shells out to jrunner
|
|
5. **`api/`** — FastAPI REST endpoints under `/api/*`, HTTP Basic Auth (except `/health`)
|
|
6. **`web/`** — HTML pages (Jinja2 templates) at `/`; HTMX + Alpine.js for interactivity
|
|
|
|
## Key Data Model
|
|
|
|
- **driver** — JDBC driver registration (kind, jar, class, url_template)
|
|
- **connection** — named DB connection (jdbc_url, username, password as `$ENV_VAR` reference)
|
|
- **module** — sync job (source/dest connection, source query, merge strategy, merge key, enabled, running lock)
|
|
- **watermark** — named placeholder with resolver SQL; first column of first row used as opaque string; replaces `{watermark_name}` in source query
|
|
- **hook** — post-merge SQL (run_order, run_on: success/failure/always)
|
|
- **run_log** — immutable history record with resolved SQL, merge SQL, watermark values, stdout/stderr, timing, live_log (streamed jrunner output written during the run)
|
|
- **grp** — named group of modules; modules belong to a group via `group_member` (with run_order)
|
|
- **group_member** — join table: group → module with run_order; disabled modules are skipped by group runs
|
|
- **group_run** — one execution of a group: status, timing, triggered_by (manual | schedule:{id})
|
|
- **schedule** — cron expression tied to a group; scheduler fires `run_group` when due; tracks `last_fired_at` to survive restarts without double-firing
|
|
|
|
## Engine Flow
|
|
|
|
```
|
|
run_module(module_id)
|
|
→ atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked
|
|
→ for each watermark: run resolver SQL via jrunner → capture first cell # always live, even dry runs
|
|
→ materialize source query (simple string replace {name} → value)
|
|
→ build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
|
|
→ if dry_run: write run_log (status='dry_run'), return early — no data movement
|
|
→ DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift
|
|
→ jrunner migrate source → staging # uses Popen; each stdout line appended to run_log.live_log
|
|
→ run merge SQL via jrunner
|
|
→ run hooks in order
|
|
→ write run_log entry
|
|
→ UPDATE module SET running=0 (in finally)
|
|
|
|
run_group(group_id)
|
|
→ create group_run row (status=running)
|
|
→ for each enabled group_member in run_order: call run_module(group_run_id=...)
|
|
→ continues past individual module failures (all members run)
|
|
→ final status: dry_run if all dry_run, error if any errored, success otherwise
|
|
→ finish_group_run(status)
|
|
```
|
|
|
|
## Run Statuses
|
|
|
|
- `running` — in progress
|
|
- `success` — completed, data moved
|
|
- `dry_run` — resolved watermarks + built SQL, no data movement
|
|
- `error` — failed
|
|
- `cancelled` — cancelled mid-run
|
|
|
|
## Credentials Pattern
|
|
|
|
Passwords in connections are stored as `$VAR_NAME` references. At run time they are resolved from `/etc/pipekit/secrets.env` (override with `PIPEKIT_SECRETS` env var). Config path override: `PIPEKIT_CONFIG`.
|
|
|
|
## Driver Abstraction
|
|
|
|
Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver.
|
|
|
|
## Module Columns
|
|
|
|
Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.
|
|
|
|
## Inline Watermark Editing
|
|
|
|
Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone `/watermarks/{id}/edit` page). The module edit form (`module_form.html`) renders existing watermarks as editable rows and submits them as parallel arrays (`wm_id[]`, `wm_name[]`, `wm_connection_id[]`, `wm_resolver_sql[]`, `wm_default_value[]`, `wm_deleted_id[]`). The `_save_inline_watermarks(form, module_id)` helper in `web/app.py` processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both `module_update` and `wizard_create` call it after saving the module. A client-side placeholder warning checks that `{name}` tokens in the source query match the watermark names on the page.
|
|
|
|
## Merge Key
|
|
|
|
`merge_key` is stored as a comma-separated string (e.g., `"col1, col2"`). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.
|
|
|
|
## Staging Table
|
|
|
|
Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run.
|
|
|
|
## Scheduler
|
|
|
|
`pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation.
|
|
|
|
## API vs. Web
|
|
|
|
- `/api/*` — JSON REST, HTTP Basic Auth, consumed by HTMX fragments and external callers
|
|
- `/` and other bare paths — full HTML pages (Jinja2), no auth currently
|
|
- `POST /modules/{id}/run` returns `{run_id}` immediately (both API and web); run is async via BackgroundTasks
|
|
- `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status
|
|
- `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links
|
|
|
|
## Tech Stack
|
|
|
|
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter
|
|
- `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
|
|
- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
|
|
- jrunner: separate Java tool, must be on PATH
|
|
- Runs as the `pipekit` system user; venv at `/opt/pipekit/.venv` owned by that user; secrets at `/etc/pipekit/secrets.env` (mode 0640, group pipekit)
|
|
|
|
## Full Spec
|
|
|
|
`/opt/pipekit/SPEC.md` is the authoritative design document. Read it for deep rationale on any architectural decision.
|