fix incremental watermark wiring and dry_run status

- Wire watermark WHERE clause into GL20000 source query ({dex_row_id} placeholder was present but query had no WHERE clause)
- Fix watermark resolver connection for GL20000 (was pointing at AS400, should be postgres dest)
- Resolve watermarks live on dry runs and module detail page load instead of using defaults
- Use status='dry_run' (not 'success') for dry runs so they can be filtered from recent runs UI
- Add exclude_status param to repo.list_runs; module detail excludes dry_run rows
- Expand run_log CHECK constraint to include 'dry_run'; backfill 16 historical records
- Delete SPEC_v1_archive.md (obsolete v1 design doc)
- Update SPEC.md and CLAUDE.md to reflect current engine flow and status values

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-05-07 19:03:38 -04:00
parent dfc76a96d8
commit 760d4e7fec
7 changed files with 32 additions and 505 deletions

View File

@ -50,17 +50,26 @@ Pipekit is a database sync tool. A **module** defines a source query → dest ta
``` ```
run_module(module_id) run_module(module_id)
→ atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked → atomic UPDATE module SET running=1 WHERE running=0 # fail if already locked
→ for each watermark: run resolver SQL via jrunner → capture first cell → for each watermark: run resolver SQL via jrunner → capture first cell # always live, even dry runs
→ materialize source query (simple string replace {name} → value) → materialize source query (simple string replace {name} → value)
→ jrunner create staging table (pipekit_staging.{module_name})
→ jrunner migrate source → staging
→ build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert) → build merge SQL (engine/merge.py: full=truncate+insert, incremental=delete by key+insert, append=insert)
→ if dry_run: write run_log (status='dry_run'), return early — no data movement
→ DROP + CREATE staging table (pipekit_staging.{module_name}) # self-healing schema drift
→ jrunner migrate source → staging
→ run merge SQL via jrunner → run merge SQL via jrunner
→ run hooks in order → run hooks in order
→ write run_log entry → write run_log entry
→ UPDATE module SET running=0 (in finally) → UPDATE module SET running=0 (in finally)
``` ```
## Run Statuses
- `running` — in progress
- `success` — completed, data moved
- `dry_run` — resolved watermarks + built SQL, no data movement
- `error` — failed
- `cancelled` — cancelled mid-run
## Credentials Pattern ## Credentials Pattern
Passwords in connections are stored as `$VAR_NAME` references. At run time they are resolved from `/etc/pipekit/secrets.env` (override with `PIPEKIT_SECRETS` env var). Config path override: `PIPEKIT_CONFIG`. Passwords in connections are stored as `$VAR_NAME` references. At run time they are resolved from `/etc/pipekit/secrets.env` (override with `PIPEKIT_SECRETS` env var). Config path override: `PIPEKIT_CONFIG`.

21
SPEC.md
View File

@ -185,15 +185,16 @@ hooks later if it gets painful.
## Engine flow (per module run) ## Engine flow (per module run)
1. **Acquire lock** atomically: `UPDATE module SET running=1 WHERE id=? AND running=0`. If row count is 0, bail with "already running." 1. **Acquire lock** atomically: `UPDATE module SET running=1 WHERE id=? AND running=0`. If row count is 0, bail with "already running."
2. **Resolve watermarks.** For each watermark: shell out to jrunner query mode against the watermark's connection with its resolver SQL. Take first row's first column as a string. Fall back to `default_value` on NULL/empty. 2. **Resolve watermarks.** For each watermark: shell out to jrunner query mode against the watermark's connection with its resolver SQL. Take first row's first column as a string. Fall back to `default_value` on NULL/empty. Always runs live — even for dry runs.
3. **Materialize the resolved source query.** Substitute `{name}` placeholders in `source_query`. Store on the module record so the TUI can preview. 3. **Materialize the resolved source query.** Substitute `{name}` placeholders in `source_query`. Store on the module record so the TUI can preview.
4. **Truncate staging** (`TRUNCATE pipekit_staging.{module_name}`). 4. **Materialize the merge SQL** based on strategy + merge_key.
5. **Run jrunner** (migration mode) with the resolved query, target = staging. 5. **Dry-run exit.** If `dry_run=True`, write `run_log` with status `dry_run` and return — no data movement.
6. **Materialize the merge SQL** based on strategy + merge_key. 6. **Recreate staging** (DROP + CREATE … LIKE dest). Self-healing against schema drift.
7. **Run merge** against dest connection (also via jrunner, or whatever path the engine uses for SQL execution). 7. **Run jrunner** (migration mode) with the resolved query, target = staging.
8. **Run hooks** in order, respecting `run_on`. 8. **Run merge** against dest connection via jrunner.
9. **Write `run_log` entry** with everything (see below). 9. **Run hooks** in order, respecting `run_on`.
10. **Release lock** in a `finally` block — always runs, even on error. 10. **Write `run_log` entry** with everything (see below).
11. **Release lock** in a `finally` block — always runs, even on error.
## Locking ## Locking
@ -228,7 +229,7 @@ run_log(
group_run_id, -- nullable; set when run as part of a group group_run_id, -- nullable; set when run as part of a group
started_at, finished_at, started_at, finished_at,
row_count, row_count,
status, -- running | success | error | cancelled status, -- running | success | error | cancelled | dry_run
error, error,
resolved_source_sql, -- exact SQL that ran on source resolved_source_sql, -- exact SQL that ran on source
merge_sql, -- exact merge SQL that ran on dest merge_sql, -- exact merge SQL that ran on dest
@ -516,7 +517,7 @@ Anything with side effects or that composes multiple steps:
``` ```
POST /connections/{id}/test run SELECT 1 via jrunner; return ok/fail/elapsed POST /connections/{id}/test run SELECT 1 via jrunner; return ok/fail/elapsed
GET /modules/{id}/preview return next resolved source SQL + merge SQL GET /modules/{id}/preview return next resolved source SQL + merge SQL
(runs watermark resolvers but does NOT sync) (resolves watermarks live, does NOT sync)
GET /modules/{id}/columns parse source query, return column list GET /modules/{id}/columns parse source query, return column list
POST /modules/{id}/run start async run; return {run_id} immediately POST /modules/{id}/run start async run; return {run_id} immediately

View File

@ -1,485 +0,0 @@
# Pipekit — ETL Tool Specification
## Overview
A lightweight, JDBC-based ETL tool for syncing tables between source systems and a PostgreSQL destination (or other JDBC destinations). Config-driven, no boilerplate scripts. Managed via TUI, API, or future web UI.
## Architecture
```
jrunner (JDBC transfer engine — existing Java app)
^
engine (Python — orchestrates jrunner, manages staging, merge, DDL, logging)
^
API (FastAPI — REST interface, Basic Auth)
^
TUI / Web UI / external callers
```
## Core Concepts
| Concept | Description |
|----------------|-----------------------------------------------------------------------------|
| **Connection** | A JDBC source or destination — URL, driver class, credentials |
| **Driver** | A JDBC driver jar registered with the system |
| **Module** | A sync job — source query + destination table + merge strategy |
| **Hook** | Post-sync SQL action run against the destination (e.g. refresh mat view) |
| **Group** | An ordered list of modules that run together |
| **Schedule** | A cron expression tied to a group |
| **Run** | A single execution — tracked with timing, row count, status, error, SQL |
## Bootstrap Config (only file on disk)
```yaml
# /opt/pipekit/config.yaml
database: /opt/pipekit/pipekit.db # SQLite — self-contained, no external DB required
jrunner_path: /usr/local/bin/jrunner
driver_dir: /opt/pipekit/drivers/
api_port: 8100
smtp: # optional, for failure notifications
host: smtp.example.com
port: 587
from: etl@example.com
to: admin@example.com
```
Everything else lives in SQLite (`pipekit.db`). No external database dependency for config — destinations can be PostgreSQL, SQL Server, or anything with a JDBC driver.
## Column Identity Model
A module's source query defines column mappings from source to destination. This is the central design constraint — every column has two identities:
| Context | Name | Example | Where used |
|---------|------|---------|------------|
| **Source column** | The original column name in the source system | `DCORD#`, `DCODAT` | Source query SELECT, WHERE clauses against the source |
| **Destination column** | The alias in the SELECT, which becomes the column name in staging and dest tables | `dcord`, `dcodat` | Staging table DDL, merge SQL, destination queries |
### Rules
1. The **source query** maps source → destination: `SELECT "DCORD#" AS dcord ...`
2. **`merge_key`** references the **destination column name** — it's used in merge SQL that runs against PostgreSQL (e.g. `DELETE FROM dest WHERE dcord IN (SELECT dcord FROM staging)`)
3. **`watermark_column`** references the **destination column name** — the engine looks up `MAX(watermark_column)` in the destination table, then must translate it back to the source column name to build the WHERE clause against the source
4. The **watermark WHERE clause** must use the **source column name** — e.g. `WHERE "DCORD#" > 12345`, not `WHERE dcord > 12345` (the source system doesn't know the alias)
5. The engine maintains a **column mapping** (alias → source expression) parsed from the source query to perform this translation
### Column Mapping Derivation
The source query is parsed to extract the mapping:
```sql
SELECT
"DCORD#" AS dcord -- source: "DCORD#", dest: dcord
,RTRIM(DCOTYP) AS dcotyp -- source: DCOTYP, dest: dcotyp (trimmed)
,DCODAT AS dcodat -- source: DCODAT, dest: dcodat
FROM LGDAT.QCRH
```
From this, the engine derives:
- `dcord``"DCORD#"` (used for WHERE clause on source)
- `dcotyp``DCOTYP` (the unwrapped column, without RTRIM)
- `dcodat``DCODAT`
When building an incremental WHERE clause for watermark column `dcord`:
1. Query dest: `SELECT MAX(dcord) FROM sync.qcrh``12345`
2. Look up source expression for `dcord``"DCORD#"`
3. Build: `WHERE "DCORD#" > 12345`
### Special Character Handling
Source columns with special characters (`#`, `@`, `$`, spaces) are:
- **Quoted in the source query** using platform-appropriate syntax: `[DCORD#]` (SQL Server), `"DCORD#"` (DB2/PostgreSQL)
- **Aliased to safe names** that are valid unquoted PostgreSQL identifiers: `dcord`, `company_name`
- The alias generation (`_safe_alias`) strips special characters, lowercases, and replaces non-alphanumeric chars with underscores
## Database Schema
All tables in SQLite (`pipekit.db`). Same schema works if migrated to PostgreSQL later.
### connection
| Column | Type | Description |
|------------------|---------|--------------------------------------------------|
| id | integer PK | Auto-increment |
| name | text | Human-readable label |
| jdbc_url | text | JDBC connection string |
| driver_id | integer | FK to driver |
| username | text | |
| password | text | Env var reference (e.g. `$DB2PW`) resolved at runtime |
| supports_deletes | boolean | Whether destination supports DELETE/UPDATE |
| created_at | text | ISO datetime |
| updated_at | text | ISO datetime |
### driver
| Column | Type | Description |
|--------------|---------|--------------------------------------------------|
| id | integer PK | Auto-increment |
| name | text | e.g. "SQL Server", "AS/400 DB2" |
| jar_file | text | Filename in driver_dir |
| class_name | text | JDBC driver class |
| url_template | text | e.g. `jdbc:sqlserver://{host};databaseName={db}` |
### module
| Column | Type | Description |
|---------------------|---------|-------------------------------------------------|
| id | integer PK | Auto-increment |
| name | text | Module identifier (unique) |
| source_connection_id| integer | FK to connection |
| dest_connection_id | integer | FK to connection |
| dest_table | text | Fully qualified destination (schema.table) |
| source_query | text | The SELECT query to run against the source |
| merge_strategy | text | `full`, `incremental`, `append`, `upsert` |
| merge_key | text | **Destination** column name for merge operations |
| watermark_column | text | **Destination** column name for incremental watermark. If null, falls back to merge_key |
| key_sync | boolean | After incremental, reconcile keys and delete orphans |
| key_sync_query | text | Optional custom query to fetch source keys |
| full_refresh_cron | text | Optional cron for periodic full refresh |
| enabled | boolean | Whether the module is active |
| running | boolean | Lock flag — set during execution |
| created_at | text | ISO datetime |
| updated_at | text | ISO datetime |
### hook
| Column | Type | Description |
|-----------|---------|------------------------------------------------------|
| id | integer PK | Auto-increment |
| module_id | integer | FK to module (CASCADE delete) |
| run_order | integer | Execution order |
| sql | text | SQL to execute against destination |
| run_on | text | `success`, `failure`, `always` |
### grp (group)
| Column | Type | Description |
|--------|---------|--------------------|
| id | integer PK | Auto-increment |
| name | text | e.g. "pricing" |
### group_member
| Column | Type | Description |
|-----------|---------|----------------------------|
| id | integer PK | Auto-increment |
| group_id | integer | FK to grp (CASCADE) |
| module_id | integer | FK to module (CASCADE) |
| run_order | integer | Execution order in group |
### schedule
| Column | Type | Description |
|-----------|---------|-------------------------------------|
| id | integer PK | Auto-increment |
| group_id | integer | FK to grp (CASCADE) |
| cron_expr | text | Cron expression (e.g. `0 2 * * *`) |
| enabled | boolean | |
### run_log
| Column | Type | Description |
|--------------|---------|----------------------------------------------------------|
| id | integer PK | Auto-increment |
| module_id | integer | FK to module |
| group_id | integer | FK to grp (nullable — null if run manually) |
| started_at | text | ISO datetime |
| finished_at | text | ISO datetime |
| row_count | integer | |
| status | text | `running`, `success`, `error`, `cancelled` |
| error | text | Error message if failed |
| source_query | text | The exact source SQL executed (with resolved WHERE) |
| merge_sql | text | The exact merge SQL executed against destination |
### module_history
| Column | Type | Description |
|-------------|---------|-------------------------------------|
| id | integer PK | Auto-increment |
| module_id | integer | FK to module (CASCADE) |
| source_query| text | Previous query text |
| changed_at | text | ISO datetime |
### settings
| Column | Type | Description |
|--------|------|-------------------------------|
| key | text PK | e.g. `smtp_host` |
| value | text | |
## Merge Strategies
| Strategy | Behavior |
|---------------|-----------------------------------------------------------------------|
| `full` | Transfer all rows to staging, TRUNCATE dest, INSERT from staging |
| `incremental` | Query dest for MAX(watermark), build WHERE clause using source column name, transfer delta, DELETE matching rows by merge_key, INSERT from staging |
| `append` | Transfer, INSERT into dest, no deletes |
| `upsert` | Transfer, INSERT ON CONFLICT(merge_key) DO UPDATE |
### Incremental Sync Flow (detailed)
1. Resolve watermark column: use `watermark_column`, fall back to `merge_key`
2. Query destination: `SELECT MAX({watermark_col}) FROM {dest_table}`
3. Parse the result — handle NULL (empty table), numeric values, date/text values
4. Parse source query to find the source expression for the watermark alias
5. Build WHERE clause using the **source expression** (not the alias):
- Numeric watermark: `WHERE "DCORD#" > 12345`
- Date/text watermark: `WHERE DEX_ROW_TS >= '2026-04-01 00:00:00'`
6. Append WHERE clause to the base source query
7. Transfer delta rows to staging
8. Merge: DELETE from dest WHERE merge_key IN (SELECT merge_key FROM staging), then INSERT
9. Run hooks
**NULL watermark handling**: If `MAX(watermark)` returns NULL (empty dest table or psql null representation like `∅`), skip the WHERE clause entirely — pull all rows.
### Handling Source Deletes
Incremental strategies only detect new/changed rows — not rows deleted from the source. Two mechanisms address this:
**1. Key reconciliation (`key_sync`)** — optional per module. After the incremental load, pull all primary key values from the source (lightweight query), compare against destination, and delete any destination rows whose key is not in the source.
**2. Periodic full refresh (`full_refresh_cron`)** — optional per module. A cron expression that triggers a full refresh on a different cadence than the incremental schedule.
### Destination-Aware Merge
The engine checks `connection.supports_deletes`:
- If true: DELETE + INSERT merge works normally
- If false: incremental/upsert fall back to insert-only, relying on the destination's dedup engine (e.g. ClickHouse ReplacingMergeTree)
## Staging Table Management
- Named `pipekit_staging.{module_name}` (persistent across runs)
- If table exists: TRUNCATE before transfer
- If table doesn't exist: probe source for column metadata (0-row jrunner transfer), create table with mapped PostgreSQL types
- Probe always uses the **base source query** (no WHERE clause) to avoid comment/subquery issues
- Left in place after runs (success or failure) for debugging
- Schemas `pipekit_staging` and destination schema auto-created if missing
## Source Introspection
The engine can browse source systems via jrunner query mode against INFORMATION_SCHEMA (or equivalent):
- **Table browsing**: list tables/views filtered by schema
- **Column metadata**: column names, types, positions
- **Linked server support** (SQL Server): query tables on linked servers via 4-part naming
- **Cross-database** (SQL Server): specify a different database than the connection default
- **Auto-propose**: given a source table, generate complete module config:
- SELECT query with RTRIM on text columns, safe aliases for special characters
- Platform-aware identifier quoting (`[brackets]` for SQL Server, `"double quotes"` for DB2/others)
- Destination DDL with mapped PostgreSQL types
- Suggested merge strategy, key, and watermark column
### Source Type Detection
Detected from JDBC URL: `as400`, `sqlserver`, `postgresql`, `clickhouse`, `mysql`
### Type Mapping (source → PostgreSQL)
varchar/char/nvarchar/nchar/text → text, int/integer → integer, bigint → bigint, decimal/numeric → numeric, float/double → double precision, date → date, datetime/timestamp → timestamp, bit → boolean, binary/varbinary → bytea, uniqueidentifier → uuid
## API Endpoints
```
# Auth: HTTP Basic Auth on all endpoints
# Connections
GET /connections
POST /connections
GET /connections/{id}
PUT /connections/{id}
DELETE /connections/{id}
POST /connections/{id}/test
GET /connections/{id}/tables?schema=
GET /connections/{id}/tables/{schema}.{table}/columns
GET /connections/{id}/tables/{schema}.{table}/propose
# Modules
GET /modules
POST /modules
GET /modules/{id}
PUT /modules/{id}
DELETE /modules/{id}
GET /modules/{id}/preview
GET /modules/{id}/dest-columns
POST /modules/{id}/run
POST /modules/{id}/run/stream
GET /modules/{id}/history
# Hooks
GET /modules/{module_id}/hooks
POST /hooks
DELETE /hooks/{id}
# Groups
GET /groups
POST /groups
GET /groups/{id}
DELETE /groups/{id}
POST /groups/{id}/members
DELETE /groups/members/{id}
POST /groups/{id}/run
# Runs
GET /runs
GET /runs/{id}
# Drivers
GET /drivers
POST /drivers
DELETE /drivers/{id}
# Schedules
GET /schedules
POST /schedules
PUT /schedules/{id}
DELETE /schedules/{id}
```
## TUI
### Main Screen
Module tree grouped by source connection. Icons: `✔` enabled, `○` disabled, `▶` running.
| Key | Action |
|-----|--------|
| `i` | Inspect module |
| `r` | Run selected module |
| `l` | Module run history |
| `L` | Global run log (all modules) |
| `n` | New module wizard |
| `c` | Manage connections |
| `/` | Search modules |
| `j/k` | Navigate |
| `g/G` | Top/bottom |
| `F5` | Refresh |
| `q` | Quit |
### Module Detail Screen (i)
Top section: module info (strategy, merge key, watermark, dest table, staging table, enabled, updated).
Middle section: column table showing source column, destination alias, and whether RTRIM is applied.
Bottom: footer with keybindings. **No SQL visible by default** — all SQL opens in `$EDITOR` (read-only) via keybindings:
| Key | Opens in editor |
|-----|-----------------|
| `q` | Next source SQL — the resolved query that would execute on next run (with WHERE clause) |
| `m` | Merge SQL — the staging-to-dest merge statements |
| `h` | Post-merge hooks |
| `b` | Base query template — the stored SELECT before watermark WHERE is appended |
| `e` | Edit base query (writable) |
| `s` | Module settings (opens edit screen) |
| `r` | Run sync |
| `l` | Run history |
### Module Settings Screen (s)
Full edit form matching the new module wizard layout:
- Module name, source/dest connections, dest table
- Merge strategy (radio buttons)
- Merge key and watermark column (searchable dropdowns populated from source query aliases = destination column names)
- Enabled toggle
Source query is **not** on this screen — use `e` from the detail screen to edit it in `$EDITOR`.
### New Module Wizard (n)
- Source/destination connection selection
- Table browser: linked server, database, schema filter fields + Load button
- Real-time search/filter over loaded tables (DataTable)
- Auto-propose on table selection (generates query, DDL, strategy suggestions)
- Merge strategy, key, watermark, dest table fields
### History Screens (l, L)
Run table with status, rows, timing, error. Below: **separate** panels for source query and merge SQL (not combined). Error shown as red text. `v` opens selected run's SQL in editor. `esc` closes.
### Run Screen (r)
Streaming jrunner output via SSE. Shows real-time transfer progress.
## Concurrency Control
Each module has a `running` flag. Before starting a sync:
1. Check if module is already running — reject if so
2. Set `running = true`
3. Execute sync
4. Set `running = false` on success or failure
## Error Handling
- On module failure: log error to run_log, stop group execution
- No automatic retries
- Staging tables preserved for debugging
- Generated SQL logged to run_log for post-mortem analysis
## Security
- API: HTTP Basic Auth (username/password stored in settings table)
- Connection passwords: stored as env var references (e.g. `$DB2PW`) resolved at runtime
## Deployment
- Single directory install (`/opt/pipekit/`)
- Bootstrap config file (`config.yaml`)
- SQLite database (`pipekit.db`) — created on first run
- JDBC drivers directory
- Python dependencies via pip/venv
- Portable: copy the directory and you've moved the whole install
## Directory Structure
```
/opt/pipekit/
config.yaml # bootstrap config (only file-based config)
pipekit.db # SQLite — all config, queries, run history
drivers/ # JDBC .jar files
engine/
db.py # SQLite schema + CRUD operations
runner.py # Sync orchestration (staging, transfer, merge, hooks)
introspect.py # Source browsing, query generation, type mapping
api/
main.py # FastAPI app
tui/
app.py # Textual TUI
client.py # HTTP client for API
requirements.txt
```
## jrunner Fixes
- **NVARCHAR/NCHAR/NTEXT/NCLOB quoting** — added case labels to jrunner's INSERT builder type switch so Unicode string types get quoted correctly.
## Migration Path from Current Setup
1. Create connections for s7830956, usmidsql01, gpserver, localhost PostgreSQL
2. Import existing modules — parse shell scripts to extract query, dest table, strategy
3. Import orchestrators as groups
4. Set up schedules to match current crontab
5. Verify runs produce same results
6. Decommission shell scripts and cron entries
## TODO
- [ ] **Implement column mapping for watermark WHERE clause** — parse source query to build alias → source expression map, use source expression (not alias) in incremental WHERE clauses
- [ ] **Cancel running sync** — track PID, add cancel endpoint + TUI binding
- [ ] **Scheduler** — background thread in the API process evaluating cron expressions every minute
- [ ] **Email notifications** — SMTP on failure
- [ ] **Upsert + incremental combo** — pull only changed rows, then INSERT ON CONFLICT UPDATE
- [ ] **Module history — full audit** — expand module_history to track all field changes, store as JSON diff
### Resolved
- **Persistent staging tables**`pipekit_staging.{name}`, truncated before each run, left in place after
- **Global run log in TUI**`L` from main screen
- **Connection pooling** — not needed at current scale
- **Scheduler location** — built into the API process (background thread)
- **module_history scope** — track all field changes
- **`timestamp_column` renamed to `watermark_column`** — reflects actual purpose (any monotonic value, not just timestamps)
## Known Issues
- **Watermark WHERE clause uses alias instead of source column name**`WHERE dcord > 12345` should be `WHERE "DCORD#" > 12345`. Blocked on implementing the column mapping (top TODO item).
- **psql null display**`MAX()` on empty table can render as `∅` depending on locale. The null check must handle this.
- **Merge key stored as `dcord#` vs alias `dcord`** — historical data may have source column names stored where alias was intended. Merge key should always be the destination column name.

View File

@ -72,7 +72,7 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
raise ValueError("source or dest connection missing") raise ValueError("source or dest connection missing")
# 23. watermarks + materialised source query # 23. watermarks + materialised source query
wm_values = watermark.resolve_watermarks(module, use_defaults_only=dry_run) wm_values = watermark.resolve_watermarks(module, use_defaults_only=False)
resolved_sql = watermark.materialise(module["source_query"], wm_values) resolved_sql = watermark.materialise(module["source_query"], wm_values)
repo.set_next_resolved_query(module_id, resolved_sql) repo.set_next_resolved_query(module_id, resolved_sql)
repo.log_run_sql(run_id, resolved_source_sql=resolved_sql, repo.log_run_sql(run_id, resolved_source_sql=resolved_sql,
@ -88,7 +88,7 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
repo.log_run_sql(run_id, merge_sql=merge_sql) repo.log_run_sql(run_id, merge_sql=merge_sql)
if dry_run: if dry_run:
status = "success" status = "dry_run"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql) return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so # 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so

View File

@ -474,12 +474,14 @@ def set_setting(key: str, value: str) -> None:
def list_runs(*, module_id: int | None = None, status: str | None = None, def list_runs(*, module_id: int | None = None, status: str | None = None,
limit: int = 50) -> list[dict]: exclude_status: str | None = None, limit: int = 50) -> list[dict]:
where, params = [], [] where, params = [], []
if module_id is not None: if module_id is not None:
where.append("r.module_id=?"); params.append(module_id) where.append("r.module_id=?"); params.append(module_id)
if status is not None: if status is not None:
where.append("r.status=?"); params.append(status) where.append("r.status=?"); params.append(status)
if exclude_status is not None:
where.append("r.status!=?"); params.append(exclude_status)
clause = ("WHERE " + " AND ".join(where)) if where else "" clause = ("WHERE " + " AND ".join(where)) if where else ""
params.append(limit) params.append(limit)
with db.connect() as c: with db.connect() as c:

View File

@ -90,7 +90,7 @@ CREATE TABLE IF NOT EXISTS group_run (
group_id INTEGER NOT NULL REFERENCES grp(id), group_id INTEGER NOT NULL REFERENCES grp(id),
started_at TEXT DEFAULT (datetime('now')), started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT, finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled')), status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled','dry_run')),
triggered_by TEXT -- schedule | manual | null triggered_by TEXT -- schedule | manual | null
); );
@ -101,7 +101,7 @@ CREATE TABLE IF NOT EXISTS run_log (
started_at TEXT DEFAULT (datetime('now')), started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT, finished_at TEXT,
row_count INTEGER, row_count INTEGER,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled')), status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled','dry_run')),
error TEXT, error TEXT,
resolved_source_sql TEXT, resolved_source_sql TEXT,
merge_sql TEXT, merge_sql TEXT,

View File

@ -94,7 +94,7 @@ def module_detail(request: Request, module_id: int):
dest = repo.get_connection(module["dest_connection_id"]) dest = repo.get_connection(module["dest_connection_id"])
watermarks = repo.list_watermarks(module_id) watermarks = repo.list_watermarks(module_id)
hooks = repo.list_hooks(module_id) hooks = repo.list_hooks(module_id)
recent_runs = repo.list_runs(module_id=module_id, limit=10) recent_runs = repo.list_runs(module_id=module_id, limit=10, exclude_status='dry_run')
schema_cols: list[dict] = [] schema_cols: list[dict] = []
if module.get("columns_json"): if module.get("columns_json"):
try: try:
@ -105,7 +105,7 @@ def module_detail(request: Request, module_id: int):
preview = None preview = None
preview_error: str | None = None preview_error: str | None = None
try: try:
wm_values = watermark.resolve_watermarks(module, use_defaults_only=True) wm_values = watermark.resolve_watermarks(module, use_defaults_only=False)
merge_sql = build_merge_sql( merge_sql = build_merge_sql(
strategy=module["merge_strategy"], strategy=module["merge_strategy"],
dest_table=module["dest_table"], dest_table=module["dest_table"],