pipekit/SPEC_v1_archive.md
Paul Trowbridge 574ada5258 Initial commit: Pipekit rewrite.
Orchestration layer around the jrunner Java JDBC CLI, replacing the
previous shell-based sync system in .archive/pre-rewrite. Includes
the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL,
PG), wizard-driven module creation with editable dest types and
source-sourced table/column descriptions, watermark/hook CRUD,
and the engine that runs modules end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:38:26 -04:00

22 KiB

Pipekit — ETL Tool Specification

Overview

A lightweight, JDBC-based ETL tool for syncing tables between source systems and a PostgreSQL destination (or other JDBC destinations). Config-driven, no boilerplate scripts. Managed via TUI, API, or future web UI.

Architecture

jrunner (JDBC transfer engine — existing Java app)
    ^
engine (Python — orchestrates jrunner, manages staging, merge, DDL, logging)
    ^
API (FastAPI — REST interface, Basic Auth)
    ^
TUI / Web UI / external callers

Core Concepts

Concept Description
Connection A JDBC source or destination — URL, driver class, credentials
Driver A JDBC driver jar registered with the system
Module A sync job — source query + destination table + merge strategy
Hook Post-sync SQL action run against the destination (e.g. refresh mat view)
Group An ordered list of modules that run together
Schedule A cron expression tied to a group
Run A single execution — tracked with timing, row count, status, error, SQL

Bootstrap Config (only file on disk)

# /opt/pipekit/config.yaml
database: /opt/pipekit/pipekit.db    # SQLite — self-contained, no external DB required
jrunner_path: /usr/local/bin/jrunner
driver_dir: /opt/pipekit/drivers/
api_port: 8100
smtp:                                # optional, for failure notifications
  host: smtp.example.com
  port: 587
  from: etl@example.com
  to: admin@example.com

Everything else lives in SQLite (pipekit.db). No external database dependency for config — destinations can be PostgreSQL, SQL Server, or anything with a JDBC driver.

Column Identity Model

A module's source query defines column mappings from source to destination. This is the central design constraint — every column has two identities:

Context Name Example Where used
Source column The original column name in the source system DCORD#, DCODAT Source query SELECT, WHERE clauses against the source
Destination column The alias in the SELECT, which becomes the column name in staging and dest tables dcord, dcodat Staging table DDL, merge SQL, destination queries

Rules

  1. The source query maps source → destination: SELECT "DCORD#" AS dcord ...
  2. merge_key references the destination column name — it's used in merge SQL that runs against PostgreSQL (e.g. DELETE FROM dest WHERE dcord IN (SELECT dcord FROM staging))
  3. watermark_column references the destination column name — the engine looks up MAX(watermark_column) in the destination table, then must translate it back to the source column name to build the WHERE clause against the source
  4. The watermark WHERE clause must use the source column name — e.g. WHERE "DCORD#" > 12345, not WHERE dcord > 12345 (the source system doesn't know the alias)
  5. The engine maintains a column mapping (alias → source expression) parsed from the source query to perform this translation

Column Mapping Derivation

The source query is parsed to extract the mapping:

SELECT
     "DCORD#"                AS dcord      -- source: "DCORD#",  dest: dcord
    ,RTRIM(DCOTYP)           AS dcotyp     -- source: DCOTYP,    dest: dcotyp (trimmed)
    ,DCODAT                  AS dcodat     -- source: DCODAT,    dest: dcodat
FROM LGDAT.QCRH

From this, the engine derives:

  • dcord"DCORD#" (used for WHERE clause on source)
  • dcotypDCOTYP (the unwrapped column, without RTRIM)
  • dcodatDCODAT

When building an incremental WHERE clause for watermark column dcord:

  1. Query dest: SELECT MAX(dcord) FROM sync.qcrh12345
  2. Look up source expression for dcord"DCORD#"
  3. Build: WHERE "DCORD#" > 12345

Special Character Handling

Source columns with special characters (#, @, $, spaces) are:

  • Quoted in the source query using platform-appropriate syntax: [DCORD#] (SQL Server), "DCORD#" (DB2/PostgreSQL)
  • Aliased to safe names that are valid unquoted PostgreSQL identifiers: dcord, company_name
  • The alias generation (_safe_alias) strips special characters, lowercases, and replaces non-alphanumeric chars with underscores

Database Schema

All tables in SQLite (pipekit.db). Same schema works if migrated to PostgreSQL later.

connection

Column Type Description
id integer PK Auto-increment
name text Human-readable label
jdbc_url text JDBC connection string
driver_id integer FK to driver
username text
password text Env var reference (e.g. $DB2PW) resolved at runtime
supports_deletes boolean Whether destination supports DELETE/UPDATE
created_at text ISO datetime
updated_at text ISO datetime

driver

Column Type Description
id integer PK Auto-increment
name text e.g. "SQL Server", "AS/400 DB2"
jar_file text Filename in driver_dir
class_name text JDBC driver class
url_template text e.g. jdbc:sqlserver://{host};databaseName={db}

module

Column Type Description
id integer PK Auto-increment
name text Module identifier (unique)
source_connection_id integer FK to connection
dest_connection_id integer FK to connection
dest_table text Fully qualified destination (schema.table)
source_query text The SELECT query to run against the source
merge_strategy text full, incremental, append, upsert
merge_key text Destination column name for merge operations
watermark_column text Destination column name for incremental watermark. If null, falls back to merge_key
key_sync boolean After incremental, reconcile keys and delete orphans
key_sync_query text Optional custom query to fetch source keys
full_refresh_cron text Optional cron for periodic full refresh
enabled boolean Whether the module is active
running boolean Lock flag — set during execution
created_at text ISO datetime
updated_at text ISO datetime

hook

Column Type Description
id integer PK Auto-increment
module_id integer FK to module (CASCADE delete)
run_order integer Execution order
sql text SQL to execute against destination
run_on text success, failure, always

grp (group)

Column Type Description
id integer PK Auto-increment
name text e.g. "pricing"

group_member

Column Type Description
id integer PK Auto-increment
group_id integer FK to grp (CASCADE)
module_id integer FK to module (CASCADE)
run_order integer Execution order in group

schedule

Column Type Description
id integer PK Auto-increment
group_id integer FK to grp (CASCADE)
cron_expr text Cron expression (e.g. 0 2 * * *)
enabled boolean

run_log

Column Type Description
id integer PK Auto-increment
module_id integer FK to module
group_id integer FK to grp (nullable — null if run manually)
started_at text ISO datetime
finished_at text ISO datetime
row_count integer
status text running, success, error, cancelled
error text Error message if failed
source_query text The exact source SQL executed (with resolved WHERE)
merge_sql text The exact merge SQL executed against destination

module_history

Column Type Description
id integer PK Auto-increment
module_id integer FK to module (CASCADE)
source_query text Previous query text
changed_at text ISO datetime

settings

Column Type Description
key text PK e.g. smtp_host
value text

Merge Strategies

Strategy Behavior
full Transfer all rows to staging, TRUNCATE dest, INSERT from staging
incremental Query dest for MAX(watermark), build WHERE clause using source column name, transfer delta, DELETE matching rows by merge_key, INSERT from staging
append Transfer, INSERT into dest, no deletes
upsert Transfer, INSERT ON CONFLICT(merge_key) DO UPDATE

Incremental Sync Flow (detailed)

  1. Resolve watermark column: use watermark_column, fall back to merge_key
  2. Query destination: SELECT MAX({watermark_col}) FROM {dest_table}
  3. Parse the result — handle NULL (empty table), numeric values, date/text values
  4. Parse source query to find the source expression for the watermark alias
  5. Build WHERE clause using the source expression (not the alias):
    • Numeric watermark: WHERE "DCORD#" > 12345
    • Date/text watermark: WHERE DEX_ROW_TS >= '2026-04-01 00:00:00'
  6. Append WHERE clause to the base source query
  7. Transfer delta rows to staging
  8. Merge: DELETE from dest WHERE merge_key IN (SELECT merge_key FROM staging), then INSERT
  9. Run hooks

NULL watermark handling: If MAX(watermark) returns NULL (empty dest table or psql null representation like ), skip the WHERE clause entirely — pull all rows.

Handling Source Deletes

Incremental strategies only detect new/changed rows — not rows deleted from the source. Two mechanisms address this:

1. Key reconciliation (key_sync) — optional per module. After the incremental load, pull all primary key values from the source (lightweight query), compare against destination, and delete any destination rows whose key is not in the source.

2. Periodic full refresh (full_refresh_cron) — optional per module. A cron expression that triggers a full refresh on a different cadence than the incremental schedule.

Destination-Aware Merge

The engine checks connection.supports_deletes:

  • If true: DELETE + INSERT merge works normally
  • If false: incremental/upsert fall back to insert-only, relying on the destination's dedup engine (e.g. ClickHouse ReplacingMergeTree)

Staging Table Management

  • Named pipekit_staging.{module_name} (persistent across runs)
  • If table exists: TRUNCATE before transfer
  • If table doesn't exist: probe source for column metadata (0-row jrunner transfer), create table with mapped PostgreSQL types
  • Probe always uses the base source query (no WHERE clause) to avoid comment/subquery issues
  • Left in place after runs (success or failure) for debugging
  • Schemas pipekit_staging and destination schema auto-created if missing

Source Introspection

The engine can browse source systems via jrunner query mode against INFORMATION_SCHEMA (or equivalent):

  • Table browsing: list tables/views filtered by schema
  • Column metadata: column names, types, positions
  • Linked server support (SQL Server): query tables on linked servers via 4-part naming
  • Cross-database (SQL Server): specify a different database than the connection default
  • Auto-propose: given a source table, generate complete module config:
    • SELECT query with RTRIM on text columns, safe aliases for special characters
    • Platform-aware identifier quoting ([brackets] for SQL Server, "double quotes" for DB2/others)
    • Destination DDL with mapped PostgreSQL types
    • Suggested merge strategy, key, and watermark column

Source Type Detection

Detected from JDBC URL: as400, sqlserver, postgresql, clickhouse, mysql

Type Mapping (source → PostgreSQL)

varchar/char/nvarchar/nchar/text → text, int/integer → integer, bigint → bigint, decimal/numeric → numeric, float/double → double precision, date → date, datetime/timestamp → timestamp, bit → boolean, binary/varbinary → bytea, uniqueidentifier → uuid

API Endpoints

# Auth: HTTP Basic Auth on all endpoints

# Connections
GET    /connections
POST   /connections
GET    /connections/{id}
PUT    /connections/{id}
DELETE /connections/{id}
POST   /connections/{id}/test
GET    /connections/{id}/tables?schema=
GET    /connections/{id}/tables/{schema}.{table}/columns
GET    /connections/{id}/tables/{schema}.{table}/propose

# Modules
GET    /modules
POST   /modules
GET    /modules/{id}
PUT    /modules/{id}
DELETE /modules/{id}
GET    /modules/{id}/preview
GET    /modules/{id}/dest-columns
POST   /modules/{id}/run
POST   /modules/{id}/run/stream
GET    /modules/{id}/history

# Hooks
GET    /modules/{module_id}/hooks
POST   /hooks
DELETE /hooks/{id}

# Groups
GET    /groups
POST   /groups
GET    /groups/{id}
DELETE /groups/{id}
POST   /groups/{id}/members
DELETE /groups/members/{id}
POST   /groups/{id}/run

# Runs
GET    /runs
GET    /runs/{id}

# Drivers
GET    /drivers
POST   /drivers
DELETE /drivers/{id}

# Schedules
GET    /schedules
POST   /schedules
PUT    /schedules/{id}
DELETE /schedules/{id}

TUI

Main Screen

Module tree grouped by source connection. Icons: enabled, disabled, running.

Key Action
i Inspect module
r Run selected module
l Module run history
L Global run log (all modules)
n New module wizard
c Manage connections
/ Search modules
j/k Navigate
g/G Top/bottom
F5 Refresh
q Quit

Module Detail Screen (i)

Top section: module info (strategy, merge key, watermark, dest table, staging table, enabled, updated).

Middle section: column table showing source column, destination alias, and whether RTRIM is applied.

Bottom: footer with keybindings. No SQL visible by default — all SQL opens in $EDITOR (read-only) via keybindings:

Key Opens in editor
q Next source SQL — the resolved query that would execute on next run (with WHERE clause)
m Merge SQL — the staging-to-dest merge statements
h Post-merge hooks
b Base query template — the stored SELECT before watermark WHERE is appended
e Edit base query (writable)
s Module settings (opens edit screen)
r Run sync
l Run history

Module Settings Screen (s)

Full edit form matching the new module wizard layout:

  • Module name, source/dest connections, dest table
  • Merge strategy (radio buttons)
  • Merge key and watermark column (searchable dropdowns populated from source query aliases = destination column names)
  • Enabled toggle

Source query is not on this screen — use e from the detail screen to edit it in $EDITOR.

New Module Wizard (n)

  • Source/destination connection selection
  • Table browser: linked server, database, schema filter fields + Load button
  • Real-time search/filter over loaded tables (DataTable)
  • Auto-propose on table selection (generates query, DDL, strategy suggestions)
  • Merge strategy, key, watermark, dest table fields

History Screens (l, L)

Run table with status, rows, timing, error. Below: separate panels for source query and merge SQL (not combined). Error shown as red text. v opens selected run's SQL in editor. esc closes.

Run Screen (r)

Streaming jrunner output via SSE. Shows real-time transfer progress.

Concurrency Control

Each module has a running flag. Before starting a sync:

  1. Check if module is already running — reject if so
  2. Set running = true
  3. Execute sync
  4. Set running = false on success or failure

Error Handling

  • On module failure: log error to run_log, stop group execution
  • No automatic retries
  • Staging tables preserved for debugging
  • Generated SQL logged to run_log for post-mortem analysis

Security

  • API: HTTP Basic Auth (username/password stored in settings table)
  • Connection passwords: stored as env var references (e.g. $DB2PW) resolved at runtime

Deployment

  • Single directory install (/opt/pipekit/)
  • Bootstrap config file (config.yaml)
  • SQLite database (pipekit.db) — created on first run
  • JDBC drivers directory
  • Python dependencies via pip/venv
  • Portable: copy the directory and you've moved the whole install

Directory Structure

/opt/pipekit/
  config.yaml           # bootstrap config (only file-based config)
  pipekit.db            # SQLite — all config, queries, run history
  drivers/              # JDBC .jar files
  engine/
    db.py               # SQLite schema + CRUD operations
    runner.py           # Sync orchestration (staging, transfer, merge, hooks)
    introspect.py       # Source browsing, query generation, type mapping
  api/
    main.py             # FastAPI app
  tui/
    app.py              # Textual TUI
    client.py           # HTTP client for API
  requirements.txt

jrunner Fixes

  • NVARCHAR/NCHAR/NTEXT/NCLOB quoting — added case labels to jrunner's INSERT builder type switch so Unicode string types get quoted correctly.

Migration Path from Current Setup

  1. Create connections for s7830956, usmidsql01, gpserver, localhost PostgreSQL
  2. Import existing modules — parse shell scripts to extract query, dest table, strategy
  3. Import orchestrators as groups
  4. Set up schedules to match current crontab
  5. Verify runs produce same results
  6. Decommission shell scripts and cron entries

TODO

  • Implement column mapping for watermark WHERE clause — parse source query to build alias → source expression map, use source expression (not alias) in incremental WHERE clauses
  • Cancel running sync — track PID, add cancel endpoint + TUI binding
  • Scheduler — background thread in the API process evaluating cron expressions every minute
  • Email notifications — SMTP on failure
  • Upsert + incremental combo — pull only changed rows, then INSERT ON CONFLICT UPDATE
  • Module history — full audit — expand module_history to track all field changes, store as JSON diff

Resolved

  • Persistent staging tablespipekit_staging.{name}, truncated before each run, left in place after
  • Global run log in TUIL from main screen
  • Connection pooling — not needed at current scale
  • Scheduler location — built into the API process (background thread)
  • module_history scope — track all field changes
  • timestamp_column renamed to watermark_column — reflects actual purpose (any monotonic value, not just timestamps)

Known Issues

  • Watermark WHERE clause uses alias instead of source column nameWHERE dcord > 12345 should be WHERE "DCORD#" > 12345. Blocked on implementing the column mapping (top TODO item).
  • psql null displayMAX() on empty table can render as depending on locale. The null check must handle this.
  • Merge key stored as dcord# vs alias dcord — historical data may have source column names stored where alias was intended. Merge key should always be the destination column name.