Scheduling: cron-based group runs via a daemon thread (scheduler.py) started at API startup. Schedules managed inline on the group edit form. last_fired_at persisted before run to prevent double-fire on restart. Requires croniter (added to requirements.txt); DB migration adds last_fired_at column to schedule table. Deploy: deploy.sh now creates the pipekit system user, chowns the repo, builds the venv as pipekit, and installs/enables the systemd unit. systemd/pipekit.service is now a production-ready unit (User=pipekit uncommented). pipekit secrets set preserves existing file permissions instead of resetting to 0600. Driver registration is now idempotent (upsert via get_driver_by_name + update_driver). Docs: CLAUDE.md and SPEC.md updated to reflect groups, scheduling, scheduler-in-API-process architecture, TUI deferred (not dropped), stop-on-failure tradeoff, jrunner as prerequisite, and deploy flow. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
702 lines
30 KiB
Markdown
702 lines
30 KiB
Markdown
# Pipekit — Spec
|
||
|
||
This spec was built from a clean-slate conversation that rederived the
|
||
design from first principles. The previous version is archived at
|
||
`SPEC_v1_archive.md` for reference.
|
||
|
||
## Status
|
||
|
||
**Implementation is underway.** Core module sync, wizard, web UI, groups,
|
||
scheduling, and deploy tooling are built and running. The TUI was dropped
|
||
in favour of a web UI (see Architecture note below).
|
||
|
||
One item is intentionally deferred: the **migration plan** for bringing
|
||
over the ~90 existing modules from `/opt/sync`. Not needed to start
|
||
implementation — Pipekit can be built and tested against new modules
|
||
first, and migration can happen later (likely via a parser that walks
|
||
`/opt/sync/*/`, extracts `pull.sql` / `insert.sql` / shell wrapper,
|
||
infers merge strategy and key, and creates module rows).
|
||
|
||
## How we got here
|
||
|
||
Started by asking what was painful about the existing shell-script-based
|
||
sync setup. Three things surfaced: authoring new modules is tedious,
|
||
observability is poor (no easy way to see what ran, how long, how many
|
||
rows, any errors), and there's no central management UI. That framed
|
||
Pipekit as an orchestration layer on top of the existing `jrunner` JDBC
|
||
tool — not replacing jrunner, wrapping it with the state and UI that
|
||
shell scripts can't provide.
|
||
|
||
Everything in this document was worked out by walking through concrete
|
||
examples from the current `/opt/sync` modules (`code`, `qcrh`,
|
||
`ffsbglr1`) and asking "what would this look like under the new system?"
|
||
When the original spec proposed something that didn't fit (like
|
||
"watermark is a single column name"), we redesigned it. The result is a
|
||
spec that reflects the actual complexity of real modules, not an
|
||
idealized simple-sync model.
|
||
|
||
---
|
||
|
||
## Motivation
|
||
|
||
User has ~90 sync modules in `/opt/sync` today, organized as shell scripts
|
||
that wrap `jrunner` (a JDBC bulk-transfer CLI at `/opt/jrunner`). Pain points
|
||
that drove this redesign:
|
||
|
||
- **Authoring is tedious.** Building SQL for new sync modules takes too long —
|
||
hand-writing pull.sql, insert.sql, the .sh wrapper, the import table DDL.
|
||
- **No observability.** Hard to answer: how often does each module run, how
|
||
many rows transfer, what SQL was used, when's the next run, how long does
|
||
it take, are tables in a good state, were there errors on the last run and
|
||
for which modules.
|
||
- **No central management.** Want a TUI like lazygit for browsing, inspecting,
|
||
running, configuring modules. User browses with nvim today and wants the TUI
|
||
to feel as spatial and navigable as a file tree.
|
||
|
||
## What jrunner does (and doesn't)
|
||
|
||
`jrunner` (at `/opt/jrunner`) is a Java CLI that does two things:
|
||
|
||
1. **Migration mode** — given source connection (`-scu/-scn/-scp`), dest
|
||
connection (`-dcu/-dcn/-dcp`), a SQL file (`-sq`), and a dest table (`-dt`),
|
||
it streams rows from source to dest with batched INSERTs.
|
||
2. **Query mode** — same source flags but no dest flags, outputs query results
|
||
to stdout in CSV/TSV. Useful for piping to visidata, less, etc.
|
||
|
||
It has no merge logic, no scheduling, no state, no awareness of incremental
|
||
syncs. It's a dumb pipe. That's the right shape — Pipekit wraps it with the
|
||
orchestration layer.
|
||
|
||
## Architecture
|
||
|
||
```
|
||
jrunner (Java CLI — bulk JDBC transfer + query mode)
|
||
↑
|
||
engine (Python — orchestrates jrunner, watermarks, merge, hooks, run log)
|
||
↑
|
||
API process (FastAPI — REST, Basic Auth)
|
||
├── scheduler thread (daemon thread; fires group runs from cron schedules)
|
||
└── web UI (Jinja2 templates, HTMX, vanilla JS — served alongside /api/*)
|
||
↑
|
||
TUI (deferred) / curl
|
||
```
|
||
|
||
The engine shells out to jrunner for **everything that touches a database** —
|
||
bulk transfers, watermark resolver queries, hooks. No separate JDBC layer in
|
||
Python. One driver-loading code path, one set of bugs.
|
||
|
||
The scheduler is a daemon thread started inside the API process via FastAPI
|
||
lifespan — not a separate process or service. It shares the same SQLite
|
||
connection pool and imports.
|
||
|
||
The FastAPI app serves both a JSON REST API (`/api/*`) and full HTML pages
|
||
(`/`, `/modules/*`, `/groups/*`, etc.) using Jinja2 templates, HTMX for
|
||
in-place updates, and vanilla JS. The web UI covers all management: modules,
|
||
connections, groups, schedules, run history.
|
||
|
||
**TUI is deferred, not dropped.** The original TUI design remains valid — it
|
||
would be an HTTP client against the API, never touching SQLite directly. Build
|
||
it when the web UI's limitations become friction (keyboard-heavy workflows,
|
||
SSH-only environments). The API design already accommodates it.
|
||
|
||
## Storage: SQLite
|
||
|
||
Everything lives in one SQLite file (`pipekit.db`). Why:
|
||
|
||
- ~90+ modules already exists; flat files don't scale to "show me all modules
|
||
that errored last night" type queries.
|
||
- The SQL itself belongs in the database, not as file references — a module is
|
||
a self-contained unit and splitting it across rows + files means two things
|
||
to keep in sync.
|
||
- Single file, copy with `cp`, no server. Schema translates to PostgreSQL later
|
||
if ever needed.
|
||
|
||
User was uneasy about losing filesystem browsing. Resolution: the **TUI is the
|
||
file browser**. Inspecting a module feels like `cat`, editing opens `$EDITOR`,
|
||
the module list feels like `ls`. For raw access, `sqlite3 pipekit.db` works.
|
||
|
||
## Module model
|
||
|
||
A module = one sync job. Fields:
|
||
|
||
- `name`
|
||
- `source_connection_id`, `dest_connection_id`
|
||
- `dest_table`
|
||
- `staging_table` (auto-managed: `pipekit_staging.{name}`)
|
||
- `source_query` — full SQL text with `{watermark_name}` placeholders. Free-form.
|
||
- `merge_strategy` — `full` / `incremental` / `append`
|
||
- `merge_key` — destination column(s) used in DELETE before INSERT
|
||
- `enabled`
|
||
- `running` (lock flag — see locking section)
|
||
|
||
The source query is **a text blob**. Not split into structured columns. The
|
||
TUI offers a column-editor mode that *parses* the SELECT list out of the
|
||
stored query, lets you edit it as a table, and *splices the new SELECT list
|
||
back in* (preserving CTEs, FROM, WHERE). For queries the parser can't handle
|
||
(too complex), the TUI falls back to raw `$EDITOR`. **Raw editing always
|
||
works.**
|
||
|
||
### Merge strategies
|
||
|
||
Two patterns from existing scripts:
|
||
|
||
- **full** — TRUNCATE dest, INSERT all from staging
|
||
- **incremental** — pull delta via watermark, DELETE rows in dest matching
|
||
merge_key, INSERT from staging
|
||
- **append** — INSERT only, no deletes
|
||
|
||
**No upsert.** The DELETE+INSERT approach already handles row-level changes
|
||
without needing column-by-column ON CONFLICT UPDATE SET clauses.
|
||
|
||
### Watermarks (multi, type-agnostic, resolver SQL)
|
||
|
||
A module can have **multiple named watermarks**. Real example from user: a
|
||
query that needs both `{date}` (max modified-timestamp from one table) and
|
||
`{number}` (max order number from another) to build a list of changed orders
|
||
to repull.
|
||
|
||
A watermark =
|
||
|
||
- `name` — placeholder name in the source query
|
||
- `connection_id` — which connection runs the resolver (could be dest, source,
|
||
or a third)
|
||
- `resolver_sql` — free-form SQL. Engine runs it via jrunner query mode, takes
|
||
first row's first column as a string.
|
||
- `default_value` — used if resolver returns NULL or zero rows
|
||
|
||
**Type-agnostic.** The engine reads the resolver result as an opaque string and
|
||
substitutes it literally. No type coercion. The user controls quoting in the
|
||
resolver SQL itself (e.g. wrap in `quote_literal()` if you want `'2610'`,
|
||
return raw if you want `2610`).
|
||
|
||
**Dialect-aware by user.** The user writes the resolver in the connection's
|
||
dialect. Engine doesn't translate. Same as today — they already write DB2 in
|
||
pull.sql and PG in insert.sql.
|
||
|
||
**No hidden generation.** Resolved SQL gets **materialized** before each run
|
||
and stored on the module record (`next_resolved_query` or similar) so the TUI
|
||
can always show "here's exactly what would run next." After the run, the
|
||
exact resolved SQL goes into the run_log.
|
||
|
||
### Hooks
|
||
|
||
A module can have post-execution hooks for things like
|
||
`REFRESH MATERIALIZED VIEW rlarp.cust` or `CALL rlarp.osm_stack_refresh()`.
|
||
|
||
A hook =
|
||
|
||
- `module_id`, `run_order`
|
||
- `connection_id` — usually dest, but anywhere
|
||
- `sql`
|
||
- `run_on` — `success` / `failure` / `always`
|
||
|
||
Hooks run sequentially after the merge. Failures get logged but don't roll
|
||
back the merge (it's already committed).
|
||
|
||
**No group-level hooks for now.** Decision deferred. The `REFRESH MATERIALIZED
|
||
VIEW rlarp.cust` at the end of `codes.sh` would attach to whichever module
|
||
logically owns that data, even if not strictly the last in order. Add group
|
||
hooks later if it gets painful.
|
||
|
||
## Engine flow (per module run)
|
||
|
||
1. **Acquire lock** atomically: `UPDATE module SET running=1 WHERE id=? AND running=0`. If row count is 0, bail with "already running."
|
||
2. **Resolve watermarks.** For each watermark: shell out to jrunner query mode against the watermark's connection with its resolver SQL. Take first row's first column as a string. Fall back to `default_value` on NULL/empty. Always runs live — even for dry runs.
|
||
3. **Materialize the resolved source query.** Substitute `{name}` placeholders in `source_query`. Store on the module record so the TUI can preview.
|
||
4. **Materialize the merge SQL** based on strategy + merge_key.
|
||
5. **Dry-run exit.** If `dry_run=True`, write `run_log` with status `dry_run` and return — no data movement.
|
||
6. **Recreate staging** (DROP + CREATE … LIKE dest). Self-healing against schema drift.
|
||
7. **Run jrunner** (migration mode) with the resolved query, target = staging.
|
||
8. **Run merge** against dest connection via jrunner.
|
||
9. **Run hooks** in order, respecting `run_on`.
|
||
10. **Write `run_log` entry** with everything (see below).
|
||
11. **Release lock** in a `finally` block — always runs, even on error.
|
||
|
||
## Locking
|
||
|
||
The `running` flag on the module is the lock. The atomic UPDATE-with-WHERE
|
||
above ensures no race window. Belt-and-suspenders for stuck locks:
|
||
|
||
- **PID-based.** Store the API process PID/UUID on the lock. On API startup,
|
||
clear locks owned by PIDs that no longer exist.
|
||
- **Time-based backstop.** On startup, also clear locks held longer than some
|
||
absurd threshold (e.g. 24h).
|
||
|
||
Lock is enforced regardless of trigger source — scheduler, group runner,
|
||
ad-hoc single-module, ad-hoc group run. All paths hit the same atomic check.
|
||
|
||
**No separate group lock needed.** If a group runner tries to start a module
|
||
that's already locked, it fails on that module and stops the group (per
|
||
stop-on-failure rule).
|
||
|
||
## Run log / observability
|
||
|
||
Two tables:
|
||
|
||
```
|
||
group_run(
|
||
id, group_id, started_at, finished_at, status, triggered_by
|
||
-- triggered_by: schedule | manual | null
|
||
)
|
||
|
||
run_log(
|
||
id,
|
||
module_id,
|
||
group_run_id, -- nullable; set when run as part of a group
|
||
started_at, finished_at,
|
||
row_count,
|
||
status, -- running | success | error | cancelled | dry_run
|
||
error,
|
||
resolved_source_sql, -- exact SQL that ran on source
|
||
merge_sql, -- exact merge SQL that ran on dest
|
||
watermark_values_json, -- {prev_period: "'2610'", ...}
|
||
jrunner_stdout,
|
||
jrunner_stderr,
|
||
hook_log,
|
||
live_log -- jrunner stdout lines appended in real time during migrate
|
||
)
|
||
```
|
||
|
||
Module history is **independent of group context** — `WHERE module_id=?` shows
|
||
every run, scheduled or manual, group or standalone. The `group_run_id` is
|
||
just an annotation.
|
||
|
||
**Run detail screen** (in TUI) shows: timing, status, row count, trigger
|
||
context, watermark values, plus keys to open in `$EDITOR`:
|
||
|
||
- `s` — resolved source SQL
|
||
- `m` — merge SQL
|
||
- `h` — hook output
|
||
- `o` — jrunner stdout/stderr
|
||
|
||
**Global run log** (`L` from main screen) — sortable, filterable across all
|
||
modules and groups. Answer "show me everything that errored in the last 24
|
||
hours" in two keystrokes.
|
||
|
||
## Groups and scheduling
|
||
|
||
```
|
||
grp(id, name)
|
||
|
||
group_member(id, group_id, module_id, run_order)
|
||
-- many-to-many; same module can live in multiple groups with different run_orders
|
||
|
||
schedule(id, group_id, cron_expr, enabled, last_fired_at)
|
||
-- a group can have 0..N schedules
|
||
```
|
||
|
||
**Sequential execution, continue past failures.** Each enabled member runs in
|
||
`run_order` regardless of whether prior members errored. Final group_run status
|
||
is `error` if any member errored, `success` if all succeeded, `dry_run` if all
|
||
were dry runs. Disabled modules are skipped entirely.
|
||
|
||
This was a deliberate change from the original "stop on failure" design. The
|
||
case for stopping: mirrors `set -e` shell behaviour; avoids running downstream
|
||
modules against stale data if an upstream one failed. The case for continuing:
|
||
full visibility into all failures in one run rather than discovering them
|
||
one-at-a-time; individual module locks already prevent unsafe concurrency.
|
||
A per-group `stop_on_failure` flag would satisfy both and is worth adding.
|
||
|
||
**Many-to-many membership.** Junction table is needed anyway for `run_order`,
|
||
so many-to-many costs nothing extra. Unique constraint can be added later if
|
||
ever needed.
|
||
|
||
**Schedule attaches to groups, not modules.** Matches the user's mental model
|
||
and avoids a huge cron-list. Individual modules can still be run ad-hoc.
|
||
|
||
**Scheduler.** `pipekit/scheduler.py` — a single daemon thread started via
|
||
FastAPI lifespan on server startup. Wakes every 60 s. For each enabled
|
||
schedule, uses `croniter` to find the next occurrence after `last_fired_at`
|
||
(or 2 minutes ago if never fired); fires if that occurrence has passed.
|
||
`last_fired_at` is written to the DB *before* the run thread is spawned —
|
||
prevents double-fire if a run is slow or pipekit restarts mid-run. A scheduled
|
||
fire and a manual fire use the same `run_group` code path; only `triggered_by`
|
||
differs (`schedule:{id}` vs `manual`).
|
||
|
||
**Ad-hoc runs:**
|
||
|
||
- `POST /modules/{id}/run` — single module
|
||
- `POST /groups/{id}/run` — whole group sequentially
|
||
|
||
Both create normal run_log entries.
|
||
|
||
## Connections and credentials
|
||
|
||
```
|
||
driver(id, name, jar_file, class_name, url_template)
|
||
|
||
connection(
|
||
id,
|
||
name,
|
||
driver_id,
|
||
jdbc_url,
|
||
username,
|
||
password,
|
||
default_dest_connection_id, -- nullable; wizard default when this is source
|
||
default_dest_schema, -- nullable; wizard default when this is source
|
||
notes,
|
||
created_at, updated_at
|
||
)
|
||
```
|
||
|
||
**Credentials = env var references.** The `password` column stores something
|
||
like `$DB2PW`. Engine resolves at runtime by reading the env var. Passwords
|
||
never live in the database. Matches existing setup
|
||
(`/opt/sync/.env` + shell scripts) and keeps `pipekit.db` safe to copy/back-up.
|
||
|
||
Test-connection: engine runs a trivial query (`SELECT 1` or equivalent) via
|
||
jrunner against the connection. Confirms URL, credentials, driver all work.
|
||
|
||
**jrunner handles all SQL execution** — bulk transfers (migration mode) and
|
||
single-value queries for watermark resolvers / hooks (query mode). Trade-off:
|
||
~100ms JVM spawn per resolver call, but one tool, one set of bugs, one
|
||
driver-loading path.
|
||
|
||
## Bootstrap / install hygiene
|
||
|
||
Pipekit verifies jrunner exists on startup (configurable path in
|
||
`config.yaml`). If missing, surfaces a clear error pointing at
|
||
`/opt/jrunner/deploy.sh`.
|
||
|
||
**`pipekit doctor`** CLI command — checks jrunner present, jrunner version,
|
||
drivers loadable, database accessible, all configured connections testable.
|
||
First thing to run after a `git pull`.
|
||
|
||
**`./deploy.sh`** — idempotent full-system deploy. Run once from a fresh clone
|
||
or re-run after any code update. Does:
|
||
|
||
1. Creates the `pipekit` system user (`useradd --system`) if absent
|
||
2. Chowns `/opt/pipekit` to `pipekit:pipekit`
|
||
3. Creates/repairs the Python venv at `.venv` owned by `pipekit`
|
||
4. Installs deps as `pipekit` user (`pip install -r requirements.txt`)
|
||
5. Installs `/usr/local/bin/pipekit` launcher symlink
|
||
6. Creates `/etc/pipekit/secrets.env` (mode 0640, group pipekit) if absent
|
||
7. Runs `pipekit init` (schema creation + migrations) as `pipekit`
|
||
8. Upserts JDBC driver rows for each jar found in jrunner's lib dir
|
||
9. Installs and enables `systemd/pipekit.service`
|
||
|
||
**Secrets** live in `/etc/pipekit/secrets.env` as `KEY=VALUE` lines. The
|
||
`password` field on a connection row stores `$KEY`; the engine resolves it at
|
||
runtime from the environment. The systemd unit loads this file as
|
||
`EnvironmentFile=`. Mode 0640, group pipekit — the service account can read it
|
||
but it is not world-readable. Managed with `pipekit secrets set/list/unset`.
|
||
|
||
**jrunner is a prerequisite, not managed by deploy.sh.** `deploy.sh` checks
|
||
that `jrunner` is on PATH and aborts with a clear message if not. Installing
|
||
jrunner (`/opt/jrunner/deploy.sh`) is a separate step the operator does first.
|
||
Keeping them decoupled means jrunner can be updated independently. Bundle later
|
||
if/when the two-step gets annoying.
|
||
|
||
## New module wizard
|
||
|
||
The centerpiece for fixing the authoring pain. Goal: from "I want to sync
|
||
table X from connection Y" to "module created, query previewed, ready to
|
||
test-run" in under a minute.
|
||
|
||
### Step 1 — Source
|
||
|
||
Pick source connection. Filter by schema. Search tables incrementally. The
|
||
TUI calls jrunner in query mode against the source's INFORMATION_SCHEMA
|
||
equivalent (DB2: `SYSIBM.SYSTABLES`, SQL Server / PG: `INFORMATION_SCHEMA.TABLES`).
|
||
|
||
### Step 2 — Columns
|
||
|
||
The engine introspects the chosen table. Proposes one row per column with:
|
||
|
||
- **In/out toggle** (default all on; toggle off the noise like `dcfut*` futures)
|
||
- **Default alias** — lowercase, special chars stripped: `DCORD#` → `dcord`
|
||
- **Default source expression** — bare column for most types; `RTRIM(col)` for
|
||
char/varchar; `CASE WHEN col IN ('0001-01-01','9999-12-31') THEN NULL ELSE col END`
|
||
for date (sentinel-NULL pattern from existing modules)
|
||
- **Default dest type** — mapped from source: `INT`→`integer`, `DECIMAL(15,4)`→`numeric(15,4)`,
|
||
`CHAR(40)`→`text`, `DATE`→`date`, etc.
|
||
|
||
`e` opens an edit modal for one row to override alias / expression / type.
|
||
Most of the time you accept defaults.
|
||
|
||
### Step 3 — Destination & merge
|
||
|
||
Pick dest connection. Dest table defaults to
|
||
`{source_conn.default_dest_schema}.{lowercase_source_table_name}`. Pick
|
||
merge strategy. Pick merge key. When strategy is `incremental`, a
|
||
**Watermarks** panel and an editable **Source query** textarea appear
|
||
inline. The source query is pre-populated from column picks; the user
|
||
edits it to add the WHERE clause with `{placeholder}` references before
|
||
creating the module. Zero or more watermarks can be added in step 3;
|
||
they are created atomically with the module on submit.
|
||
|
||
**Multiple destinations are real** (e.g. PG → SQL Server). The wizard
|
||
doesn't assume one dest. Each source connection has a
|
||
`default_dest_connection_id` + `default_dest_schema` pair that
|
||
pre-populate Step 3. Both are nullable; fallback is last-used dest.
|
||
|
||
### Step 1 — Source (driver-dependent browse form)
|
||
|
||
Different drivers need different scope fields ("qualifiers") to identify a
|
||
table. DB2 needs just `schema`. SQL Server can need up to three:
|
||
`linked_server`, `database`, `schema` (any combination — linked server
|
||
optional, database optional, schema defaults to `dbo`). This is because
|
||
SQL Server can reference tables in other databases on the same server, or
|
||
tables on entirely different servers via linked servers — and the FROM
|
||
clause syntax changes (`schema.table`, `db.schema.table`,
|
||
`[linked].[db].[schema].[table]`).
|
||
|
||
Each driver exposes:
|
||
|
||
```python
|
||
class Driver:
|
||
def browse_fields(self) -> list[BrowseField]:
|
||
"""Qualifier fields for the wizard's Step 1 form."""
|
||
|
||
def list_tables(self, **qualifiers) -> list[Table]:
|
||
"""INFORMATION_SCHEMA query using whatever qualifiers are set."""
|
||
|
||
def get_columns(self, table_name: str, **qualifiers) -> list[Column]:
|
||
"""Column lookup for a specific table."""
|
||
|
||
def qualified_table_name(self, table_name: str, **qualifiers) -> str:
|
||
"""FROM-clause identifier. Wizard-time only."""
|
||
|
||
def map_type(self, source_type) -> str: ...
|
||
def default_expression(self, source_type, column_name) -> str: ...
|
||
def quote_identifier(self, name) -> str: ...
|
||
```
|
||
|
||
Textual renders Step 1 dynamically from `browse_fields()`. The wizard
|
||
calls `qualified_table_name()` once to bake the FROM clause into the
|
||
stored source query. **Linked servers / qualifiers are not first-class in
|
||
Pipekit** — they exist only as syntax inside the generated FROM. Nothing
|
||
is persisted on the module about how the table was qualified at author
|
||
time. If you later need to add a column, you type the expression and
|
||
alias by hand in the column editor — no re-browsing needed.
|
||
|
||
### Step 4 — Preview
|
||
|
||
Show the generated source query, generated staging DDL, generated merge SQL.
|
||
Everything visible. `e` to drop into `$EDITOR` for free-form fixes. `c` to
|
||
create — writes the module row, creates the staging table on dest, offers a
|
||
test-run.
|
||
|
||
### Per-driver capability needed
|
||
|
||
Each driver module (`engine/drivers/db2.py`, etc.) implements:
|
||
|
||
- `list_tables(schema_filter)` — SQL template for INFORMATION_SCHEMA
|
||
- `get_columns(schema, table)` — column name, type, length, nullable
|
||
- `map_type(source_type)` → dest type
|
||
- `default_expression(source_type, column_name)` → wrap in RTRIM, CASE, etc.
|
||
- `quote_identifier(name)` — `"DCORD#"` (DB2/PG) vs `[DCORD#]` (MSSQL)
|
||
|
||
Defaults are **opinions hardcoded in driver modules** for now. Lift to a
|
||
`driver_default` table later if configurability is ever needed.
|
||
|
||
### Wizard scope (what it does NOT do)
|
||
|
||
- **No CTE-based queries.** Wizard generates a simple `SELECT cols FROM table` skeleton; the user adds WHERE clauses (including watermark placeholders) in the editable source query textarea in step 3, or post-creation via the module edit form.
|
||
- **Multi-watermark supported in wizard.** Add as many watermarks as needed in step 3; they're created atomically with the module.
|
||
- **No hooks in the wizard.** Add hooks from the module detail screen.
|
||
- **No group assignment in the wizard.** Assign separately.
|
||
|
||
These are intentional. The wizard handles the 80% case fast. The 20% cases
|
||
are post-creation edits where you already have a working module to start from.
|
||
|
||
## TUI — main screen sketch
|
||
|
||
```
|
||
Pipekit
|
||
─────────────────────────────────────────────────
|
||
▼ s7830956 (AS/400 DB2)
|
||
✔ code full 2m ago 1,204 rows 0.8s
|
||
✔ name full 2m ago 892 rows 0.6s
|
||
✔ qcrh incr 2m ago 1,031 rows 3.2s
|
||
✗ qcri incr 2m ago — err
|
||
○ cust full disabled
|
||
▼ usmidsql01 (SQL Server)
|
||
✔ live_quotes full 2m ago 340 rows 1.1s
|
||
|
||
Groups
|
||
pricing 9 modules cron 0 20 2 * * * next: 2:20am
|
||
codes 26 modules cron 0 0 2 * * * next: 2:00am
|
||
```
|
||
|
||
Modules grouped by source connection (mirrors today's directory layout).
|
||
Status / strategy / last-run / row-count / duration on each line. Groups at
|
||
the bottom with schedules and next-fire times.
|
||
|
||
`i` inspect, `r` run, `l` history, `L` global log, `n` new module, `c`
|
||
connections, `/` search, `j/k` navigate, `q` quit. Should feel like lazygit /
|
||
nvim file tree.
|
||
|
||
### Module detail (i)
|
||
|
||
Top: module info (strategy, merge key, watermark, dest table, staging table,
|
||
enabled, last/next run). Middle: column table (parsed from source query).
|
||
Bottom: keybindings.
|
||
|
||
Keys open things in `$EDITOR` (read-only):
|
||
|
||
- `q` — next resolved source SQL
|
||
- `m` — merge SQL
|
||
- `b` — base query template (with placeholders)
|
||
- `e` — edit base query (writable)
|
||
- `w` — watermarks
|
||
- `h` — hooks
|
||
- `c` — column editor (parsed from query)
|
||
- `r` — run
|
||
- `l` — history
|
||
|
||
## API surface
|
||
|
||
**REST over HTTP**, FastAPI, HTTP Basic Auth on all endpoints except
|
||
`/health`. In practice the API only uses **GET (reads) and POST
|
||
(writes)** — PUT/DELETE avoided to keep the mental model simple.
|
||
|
||
### Resource CRUD
|
||
|
||
Every core table (connection, driver, module, watermark, hook, group,
|
||
group_member, schedule) gets the same URL pattern:
|
||
|
||
```
|
||
GET /things list (with filter query params)
|
||
GET /things/{id} read one
|
||
POST /things create
|
||
POST /things/{id} update
|
||
POST /things/{id}/delete delete
|
||
```
|
||
|
||
JSON shape = snake_case matching database columns. ISO 8601 timestamps.
|
||
Integer IDs. No transformation layer between SQL and JSON.
|
||
|
||
### Operation endpoints
|
||
|
||
Anything with side effects or that composes multiple steps:
|
||
|
||
```
|
||
POST /connections/{id}/test run SELECT 1 via jrunner; return ok/fail/elapsed
|
||
GET /modules/{id}/preview return next resolved source SQL + merge SQL
|
||
(resolves watermarks live, does NOT sync)
|
||
GET /modules/{id}/columns parse source query, return column list
|
||
|
||
POST /modules/{id}/run start async run; return {run_id} immediately
|
||
POST /groups/{id}/run start async group run; return {group_run_id}
|
||
POST /modules/{id}/cancel cancel running module (release lock, kill jrunner)
|
||
POST /groups/{id}/cancel cancel running group
|
||
|
||
GET /runs list runs (filter: ?module_id= ?status= ?since=)
|
||
GET /runs/{id} run detail (SQL, stdout/stderr, hook output)
|
||
GET /runs/{id}/stream Server-Sent Events: live log + status
|
||
GET /group-runs list group runs
|
||
GET /group-runs/{id} group run with child module runs
|
||
GET /modules/{id}/runs shortcut: runs for one module
|
||
```
|
||
|
||
### Introspection endpoints (wizard backend)
|
||
|
||
```
|
||
POST /introspect/tables body: {connection_id, qualifiers: {...}}
|
||
POST /introspect/columns body: {connection_id, table_name, qualifiers}
|
||
POST /introspect/propose body: {connection_id, table_name, qualifiers}
|
||
returns a ready-to-POST module JSON
|
||
```
|
||
|
||
`propose` is curl-able — you can generate a module proposal, tweak the
|
||
JSON, then POST it to `/modules` to create. No TUI required.
|
||
|
||
### System endpoints
|
||
|
||
```
|
||
GET /health liveness only, no auth required
|
||
GET /doctor full check (jrunner, drivers, db, connections, scheduler)
|
||
powers `pipekit doctor` CLI
|
||
GET /settings
|
||
POST /settings/{key}
|
||
```
|
||
|
||
### Async runs + live progress
|
||
|
||
`POST /modules/{id}/run` does NOT block. It atomically acquires the
|
||
module lock, kicks off the sync in a background task, and returns
|
||
`{"run_id": 4892}` immediately (both the API and web POST).
|
||
|
||
Two ways to watch a run after that:
|
||
|
||
1. **Polling** — `GET /runs/{id}` returns the run_log row; keep hitting
|
||
it until `status != running`. Simple, works anywhere.
|
||
2. **Live fragment polling** — the web run detail page (`/runs/{id}`)
|
||
embeds an HTMX fragment that polls `GET /runs/{id}/live` every 2 s
|
||
while `status == running`. The fragment shows current status,
|
||
row count, and the `live_log` field as it grows. Polling stops
|
||
automatically once status leaves `running`.
|
||
|
||
**Live log** — during `jrunner.migrate()` the engine uses `subprocess.Popen`
|
||
(not `subprocess.run`) so stdout can be read line-by-line as jrunner
|
||
emits it. Each non-empty line is appended to `run_log.live_log` via
|
||
`repo.append_run_live_log()`. How frequently lines appear depends on
|
||
jrunner's output flushing behaviour; at minimum the final row-count
|
||
summary line appears when the transfer completes.
|
||
|
||
Splitting `start` from `watch` means:
|
||
|
||
- Cron-triggered runs don't have to watch
|
||
- Curl scripting can fire-and-forget
|
||
- A browser navigating directly to `/runs/{id}` mid-run picks up the
|
||
live panel automatically
|
||
|
||
### Auth
|
||
|
||
HTTP Basic. Username/password in the `settings` table. Single-user tool
|
||
for now; swap to JWT later if multi-user is ever needed, without
|
||
breaking URL structure.
|
||
|
||
### TUI = HTTP client
|
||
|
||
The TUI never touches SQLite directly. Every screen reads from an
|
||
endpoint. This guarantees zero behavioral drift between TUI and any
|
||
future web UI, and makes the API the single source of truth for
|
||
behavior.
|
||
|
||
## Open questions still to answer
|
||
|
||
1. ~~**Wizard defaults match user's mental model?**~~ Confirmed — RTRIM,
|
||
sentinel-date NULL, lowercased aliases are fine for now.
|
||
2. ~~**Dest table default?**~~ Resolved — per-source connection
|
||
`default_dest_connection_id` + `default_dest_schema`.
|
||
3. ~~**API surface.**~~ Resolved — REST, GET/POST only, async runs, SSE
|
||
for live output, CRUD + operations + introspection mix.
|
||
4. **Migration plan.** Deferred. Would involve a parser that walks
|
||
`/opt/sync/*/`, extracts pull.sql / insert.sql / sh wrapper, infers
|
||
merge strategy and key, creates module rows.
|
||
|
||
## Decisions log (fast reference)
|
||
|
||
| Decision | Choice |
|
||
|---|---|
|
||
| Storage | SQLite, single file |
|
||
| Where SQL lives | In the database (text blobs), not files |
|
||
| Source query shape | Free text with `{watermark}` placeholders |
|
||
| Columns | Parsed from query; not separate rows; wizard auto-introspects on create |
|
||
| Watermarks | Multiple per module, type-agnostic, free-form resolver SQL |
|
||
| Merge strategies | full / incremental / append (no upsert) |
|
||
| Hooks | Per-module, post-merge, run_on success/failure/always |
|
||
| Group hooks | Deferred — not needed yet |
|
||
| Group membership | Many-to-many (junction table for run_order anyway) |
|
||
| Group execution | Sequential, continue past failures; final status = worst of members |
|
||
| Schedules | Attach to groups; multiple schedules per group allowed |
|
||
| Locking | Atomic UPDATE on `module.running`; PID + time-based stale clearing |
|
||
| Credentials | Env var references (`$DB2PW`); resolved at runtime |
|
||
| SQL execution | Everything via jrunner (migration + query mode) |
|
||
| Materialized SQL | Always — resolved source SQL stored before run + after run |
|
||
| Install | Loose-coupled to jrunner for now; bundle later |
|
||
| TUI feel | Like lazygit / nvim file tree; spatial, keyboard-driven |
|
||
| Authoring | Wizard handles 80% case; post-creation editing handles the rest |
|
||
| Multiple destinations | Supported. Source conn holds `default_dest_connection_id` + `default_dest_schema` for wizard prepopulation |
|
||
| Driver browse fields | Per-driver qualifier set (`schema` for DB2/PG, up to `linked_server`/`database`/`schema` for MSSQL) |
|
||
| Linked servers | Not first-class; only affect FROM-clause syntax at author time; not persisted on module |
|
||
| API style | REST, GET for reads, POST for writes, no PUT/DELETE |
|
||
| Run model | Async — POST /run returns run_id immediately; watch via polling or SSE stream |
|
||
| Live output | HTMX polling every 2–3 s (SSE deferred — polling sufficient for current scale) |
|
||
| Auth | HTTP Basic, single user, creds in settings table |
|
||
| TUI | Deferred — web UI built first; TUI remains valid future work as an API client |
|
||
| Scheduler | croniter for cron evaluation; last_fired_at persisted before run to prevent double-fire |
|
||
| Service account | Runs as pipekit system user; venv + db owned by that user; secrets 0640 group pipekit |
|