pipekit/pipekit/schema.sql
Paul Trowbridge 99f75490c4 Add live progress to module runs: async web POST + HTMX polling
Web POST /modules/{id}/run now returns immediately (BackgroundTasks)
instead of blocking until the run completes. jrunner.migrate() switches
from subprocess.run to Popen so stdout lines are read as they arrive and
appended to run_log.live_log via repo.append_run_live_log(). The run
detail page embeds an HTMX fragment that polls /runs/{id}/live every 2s
while status=running, showing current status, row count, and live output;
polling stops automatically once the run finishes.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-08 14:02:36 -04:00

123 lines
5.2 KiB
SQL

-- Pipekit schema. Single source of truth — read by pipekit.db.init_db().
-- See SPEC.md sections: "Module model", "Run log / observability",
-- "Groups and scheduling", "Connections and credentials".
CREATE TABLE IF NOT EXISTS driver (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT NOT NULL, -- db2 | mssql | pg | ... (picks the Driver class)
jar_file TEXT NOT NULL,
class_name TEXT NOT NULL,
url_template TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS connection (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
driver_id INTEGER NOT NULL REFERENCES driver(id),
jdbc_url TEXT NOT NULL,
username TEXT,
password TEXT, -- env-var reference, e.g. "$DB2PW"
default_dest_connection_id INTEGER REFERENCES connection(id),
default_dest_schema TEXT,
notes TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS module (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
source_connection_id INTEGER NOT NULL REFERENCES connection(id),
dest_connection_id INTEGER NOT NULL REFERENCES connection(id),
dest_table TEXT NOT NULL,
staging_table TEXT NOT NULL, -- pipekit_staging.{name}
source_query TEXT NOT NULL, -- free text with {watermark} placeholders
merge_strategy TEXT NOT NULL DEFAULT 'full' CHECK (merge_strategy IN ('full','incremental','append')),
merge_key TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
running INTEGER NOT NULL DEFAULT 0,
running_pid TEXT,
running_since TEXT,
next_resolved_query TEXT, -- materialised before each run for TUI preview
columns_json TEXT, -- [{source_name, source_type, dest_name, dest_type, description}, ...]
dest_description TEXT, -- COMMENT ON TABLE value, also shown in the UI
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS watermark (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
name TEXT NOT NULL,
connection_id INTEGER NOT NULL REFERENCES connection(id),
resolver_sql TEXT NOT NULL,
default_value TEXT,
UNIQUE(module_id, name)
);
CREATE TABLE IF NOT EXISTS hook (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
run_order INTEGER NOT NULL DEFAULT 0,
connection_id INTEGER REFERENCES connection(id),
sql TEXT NOT NULL,
run_on TEXT NOT NULL DEFAULT 'success' CHECK (run_on IN ('success','failure','always'))
);
CREATE TABLE IF NOT EXISTS grp (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS group_member (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
run_order INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS schedule (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
cron_expr TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS group_run (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id),
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled','dry_run')),
triggered_by TEXT -- schedule | manual | null
);
CREATE TABLE IF NOT EXISTS run_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id),
group_run_id INTEGER REFERENCES group_run(id),
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
row_count INTEGER,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running','success','error','cancelled','dry_run')),
error TEXT,
resolved_source_sql TEXT,
merge_sql TEXT,
watermark_values_json TEXT,
jrunner_stdout TEXT,
jrunner_stderr TEXT,
hook_log TEXT,
live_log TEXT
);
CREATE INDEX IF NOT EXISTS idx_run_log_module ON run_log(module_id, id DESC);
CREATE INDEX IF NOT EXISTS idx_run_log_status ON run_log(status, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_run_log_group_run ON run_log(group_run_id);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);