diff --git a/CLAUDE.md b/CLAUDE.md index 9361e52..9a9d38a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -106,7 +106,9 @@ Step 1 offers two paths. **Browse a table** (`/wizard/tables` → `/wizard/colum ## Module Columns -Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists. +Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type` (and a stable `id`). It drives the dest `CREATE TABLE`; the engine itself doesn't read it at run time (it does `CREATE staging LIKE dest` + `SELECT *`). + +**Post-creation column add.** The module detail page's Schema panel has a "+ add column" action (`/modules/{id}/columns/new` → `POST /modules/{id}/columns`) that appends a column to an existing module: it `ALTER TABLE … ADD COLUMN`s the dest (always at the tail — the only position ALTER supports, which keeps the positional load aligned), applies a column comment where supported, and appends to `columns_json`. Each column row carries a stable `id` (`c1`, `c2`, …) — the data-movement identity for future schema reconciliation. Add is the only mutation for now; reorder/retype/drop (which can require a table rebuild) are not implemented. Driver methods: `build_add_column_sql`, `column_inventory`. ## Inline Watermark Editing @@ -120,6 +122,10 @@ Watermarks are managed inline on both the module edit form and wizard step 3 (no Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF NOT EXISTS). Ephemeral — exists only during the run. +## Bulk Copy (SQL Server dest) + +`jrunner.migrate` passes jrunner's `-b` flag when the dest JDBC URL starts with `jdbc:sqlserver:`, so SQL Server loads stream via `SQLServerBulkCopy` (TDS bulk-load) instead of batched INSERTs — dramatically faster on large/wide tables (a 1.27M-row load went ~111 min → ~4 min). DB2/PG dests keep the INSERT path. This is automatic per-dest; no module config. (Requires jrunner with `-b` support.) Note: jrunner only streams the **Postgres source** without buffering it all into memory because it sets `autoCommit(false)` on the source connection in migration mode — a PG-driver requirement for `setFetchSize` to take effect. + ## Scheduler `pipekit/scheduler.py` runs a single daemon thread (started via FastAPI lifespan in `api/app.py`). It wakes every 60 s, reads all enabled schedules, and fires `run_group` for any whose next cron occurrence has passed since `last_fired_at`. `last_fired_at` is written to the DB before the run thread is spawned — prevents double-fire if a run is slow or pipekit restarts mid-run. Uses `croniter` for cron expression evaluation. Cron expressions are evaluated in **local server time** (not UTC) — `0 4 * * *` fires at 04:00 local. @@ -132,6 +138,14 @@ Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF - `GET /runs/{id}/live` — HTML fragment endpoint; HTMX polls this every 2 s while status=running to show live_log + status - `/groups/*` — group CRUD, run, live fragment; `/group-runs/{id}` — group run detail with per-module run links +## Logging + +`api/app.py` configures a `pipekit` logger to stderr (→ journal) and registers a `StarletteHTTPException` handler that logs 5xx at ERROR (with traceback) and 4xx at WARNING — because FastAPI otherwise turns `HTTPException` into a response and logs nothing (failures like wizard dest-provisioning errors were invisible). A small ASGI middleware (`_RequestBodyCapture`) buffers each request body and replays it downstream so the handler can log the submitted payload on failure, with secret-looking fields (password/pwd/secret/token) masked. Covers *raised* HTTPExceptions; handlers that *return* an error response aren't body-logged. + +## Live-log progress collapse + +jrunner prints an in-place progress counter (a bare number) per batch; read with universal newlines each tick is its own line, so a big load would stack thousands of numbers in `live_log`. `repo.append_run_live_log` collapses them: a bare-number tick overwrites a trailing bare-number line instead of appending, keeping a single current count. Real lines (headers, "N rows written", timestamps) are preserved. (Bulk copy emits a throttled counter every 10k rows, displayed the same way.) + ## Tech Stack - Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib), croniter