pipekit/.archive/pre-rewrite/engine/db.py
Paul Trowbridge 574ada5258 Initial commit: Pipekit rewrite.
Orchestration layer around the jrunner Java JDBC CLI, replacing the
previous shell-based sync system in .archive/pre-rewrite. Includes
the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL,
PG), wizard-driven module creation with editable dest types and
source-sourced table/column descriptions, watermark/hook CRUD,
and the engine that runs modules end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:38:26 -04:00

687 lines
24 KiB
Python

"""SQLite database layer for Pipekit."""
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from config import get_config
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS driver (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
jar_file TEXT NOT NULL,
class_name TEXT NOT NULL,
url_template TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS connection (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
driver_id INTEGER REFERENCES driver(id),
jdbc_url TEXT NOT NULL,
username TEXT,
password TEXT,
default_dest_connection_id INTEGER REFERENCES connection(id),
default_dest_schema TEXT,
notes TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS module (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
source_connection_id INTEGER NOT NULL REFERENCES connection(id),
dest_connection_id INTEGER NOT NULL REFERENCES connection(id),
dest_table TEXT NOT NULL,
source_query TEXT NOT NULL,
merge_strategy TEXT NOT NULL DEFAULT 'full',
merge_key TEXT,
enabled INTEGER DEFAULT 1,
running INTEGER DEFAULT 0,
running_pid TEXT,
running_since TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS watermark (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
name TEXT NOT NULL,
connection_id INTEGER NOT NULL REFERENCES connection(id),
resolver_sql TEXT NOT NULL,
default_value TEXT,
UNIQUE(module_id, name)
);
CREATE TABLE IF NOT EXISTS hook (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
run_order INTEGER NOT NULL DEFAULT 0,
connection_id INTEGER REFERENCES connection(id),
sql TEXT NOT NULL,
run_on TEXT NOT NULL DEFAULT 'success'
);
CREATE TABLE IF NOT EXISTS grp (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS group_member (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
run_order INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS schedule (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
cron_expr TEXT NOT NULL,
enabled INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS group_run (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL REFERENCES grp(id),
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
triggered_by TEXT
);
CREATE TABLE IF NOT EXISTS run_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_id INTEGER NOT NULL REFERENCES module(id),
group_run_id INTEGER REFERENCES group_run(id),
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
row_count INTEGER,
status TEXT NOT NULL DEFAULT 'running',
error TEXT,
resolved_source_sql TEXT,
merge_sql TEXT,
watermark_values_json TEXT,
jrunner_stdout TEXT,
jrunner_stderr TEXT,
hook_log TEXT
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);
"""
def get_db_path() -> str:
return get_config()["database"]
def init_db():
"""Create all tables if they don't exist."""
with get_conn() as conn:
conn.executescript(SCHEMA_SQL)
@contextmanager
def get_conn():
"""Get a SQLite connection with row_factory set."""
conn = sqlite3.connect(get_db_path())
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# ---------------------------------------------------------------------------
# Drivers
# ---------------------------------------------------------------------------
def create_driver(name: str, jar_file: str, class_name: str,
url_template: str = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO driver (name, jar_file, class_name, url_template) "
"VALUES (?, ?, ?, ?)",
(name, jar_file, class_name, url_template),
)
return dict(conn.execute(
"SELECT * FROM driver WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_driver(driver_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM driver WHERE id = ?", (driver_id,)).fetchone()
return dict(row) if row else None
def list_drivers() -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT * FROM driver ORDER BY name"
).fetchall()]
def delete_driver(driver_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM driver WHERE id = ?", (driver_id,))
# ---------------------------------------------------------------------------
# Connections
# ---------------------------------------------------------------------------
def create_connection(name: str, jdbc_url: str, driver_id: int = None,
username: str = None, password: str = None,
default_dest_connection_id: int = None,
default_dest_schema: str = None,
notes: str = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO connection (name, jdbc_url, driver_id, username, password, "
"default_dest_connection_id, default_dest_schema, notes) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(name, jdbc_url, driver_id, username, password,
default_dest_connection_id, default_dest_schema, notes),
)
return dict(conn.execute(
"SELECT * FROM connection WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_connection(conn_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM connection WHERE id = ?", (conn_id,)).fetchone()
return dict(row) if row else None
def list_connections() -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT * FROM connection ORDER BY name"
).fetchall()]
def update_connection(conn_id: int, **kwargs) -> dict:
allowed = {"name", "jdbc_url", "driver_id", "username", "password",
"default_dest_connection_id", "default_dest_schema", "notes"}
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
if not fields:
return get_connection(conn_id)
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values())
with get_conn() as conn:
conn.execute(
f"UPDATE connection SET {sets}, updated_at = datetime('now') WHERE id = ?",
vals + [conn_id],
)
return get_connection(conn_id)
def delete_connection(conn_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM connection WHERE id = ?", (conn_id,))
# ---------------------------------------------------------------------------
# Modules
# ---------------------------------------------------------------------------
def create_module(name: str, source_connection_id: int, dest_connection_id: int,
dest_table: str, source_query: str, merge_strategy: str = "full",
merge_key: str = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO module (name, source_connection_id, dest_connection_id, "
"dest_table, source_query, merge_strategy, merge_key) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(name, source_connection_id, dest_connection_id, dest_table,
source_query, merge_strategy, merge_key),
)
return dict(conn.execute(
"SELECT * FROM module WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_module(module_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM module WHERE id = ?", (module_id,)).fetchone()
return dict(row) if row else None
def list_modules() -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT * FROM module ORDER BY name"
).fetchall()]
def update_module(module_id: int, **kwargs) -> dict:
allowed = {"name", "source_connection_id", "dest_connection_id", "dest_table",
"source_query", "merge_strategy", "merge_key", "enabled"}
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
if not fields:
return get_module(module_id)
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values())
with get_conn() as conn:
conn.execute(
f"UPDATE module SET {sets}, updated_at = datetime('now') WHERE id = ?",
vals + [module_id],
)
return get_module(module_id)
def acquire_module_lock(module_id: int, pid: str) -> bool:
"""Atomically acquire the run lock. Returns True if acquired."""
with get_conn() as conn:
cur = conn.execute(
"UPDATE module SET running = 1, running_pid = ?, "
"running_since = datetime('now') "
"WHERE id = ? AND running = 0",
(pid, module_id),
)
return cur.rowcount > 0
def release_module_lock(module_id: int):
"""Release the run lock."""
with get_conn() as conn:
conn.execute(
"UPDATE module SET running = 0, running_pid = NULL, "
"running_since = NULL WHERE id = ?",
(module_id,),
)
def clear_stale_locks(max_age_hours: int = 24):
"""Clear locks held longer than max_age_hours or by dead PIDs."""
with get_conn() as conn:
conn.execute(
"UPDATE module SET running = 0, running_pid = NULL, running_since = NULL "
"WHERE running = 1 AND running_since < datetime('now', ?)",
(f"-{max_age_hours} hours",),
)
def delete_module(module_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM module WHERE id = ?", (module_id,))
# ---------------------------------------------------------------------------
# Watermarks
# ---------------------------------------------------------------------------
def create_watermark(module_id: int, name: str, connection_id: int,
resolver_sql: str, default_value: str = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO watermark (module_id, name, connection_id, resolver_sql, "
"default_value) VALUES (?, ?, ?, ?, ?)",
(module_id, name, connection_id, resolver_sql, default_value),
)
return dict(conn.execute(
"SELECT * FROM watermark WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_watermark(watermark_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM watermark WHERE id = ?", (watermark_id,)
).fetchone()
return dict(row) if row else None
def list_watermarks(module_id: int) -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT * FROM watermark WHERE module_id = ? ORDER BY name",
(module_id,),
).fetchall()]
def update_watermark(watermark_id: int, **kwargs) -> dict:
allowed = {"name", "connection_id", "resolver_sql", "default_value"}
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
if not fields:
return get_watermark(watermark_id)
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values())
with get_conn() as conn:
conn.execute(
f"UPDATE watermark SET {sets} WHERE id = ?",
vals + [watermark_id],
)
return get_watermark(watermark_id)
def delete_watermark(watermark_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM watermark WHERE id = ?", (watermark_id,))
# ---------------------------------------------------------------------------
# Hooks
# ---------------------------------------------------------------------------
def create_hook(module_id: int, sql: str, run_order: int = 0,
connection_id: int = None, run_on: str = "success") -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO hook (module_id, run_order, connection_id, sql, run_on) "
"VALUES (?, ?, ?, ?, ?)",
(module_id, run_order, connection_id, sql, run_on),
)
return dict(conn.execute(
"SELECT * FROM hook WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_hook(hook_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM hook WHERE id = ?", (hook_id,)).fetchone()
return dict(row) if row else None
def list_hooks(module_id: int) -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT * FROM hook WHERE module_id = ? ORDER BY run_order",
(module_id,),
).fetchall()]
def update_hook(hook_id: int, **kwargs) -> dict:
allowed = {"run_order", "connection_id", "sql", "run_on"}
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
if not fields:
return get_hook(hook_id)
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values())
with get_conn() as conn:
conn.execute(f"UPDATE hook SET {sets} WHERE id = ?", vals + [hook_id])
return get_hook(hook_id)
def delete_hook(hook_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM hook WHERE id = ?", (hook_id,))
# ---------------------------------------------------------------------------
# Groups
# ---------------------------------------------------------------------------
def create_group(name: str) -> dict:
with get_conn() as conn:
cur = conn.execute("INSERT INTO grp (name) VALUES (?)", (name,))
return dict(conn.execute(
"SELECT * FROM grp WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_group(group_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM grp WHERE id = ?", (group_id,)).fetchone()
if not row:
return None
g = dict(row)
g["members"] = [dict(r) for r in conn.execute(
"SELECT gm.*, m.name AS module_name FROM group_member gm "
"JOIN module m ON gm.module_id = m.id "
"WHERE gm.group_id = ? ORDER BY gm.run_order", (group_id,)
).fetchall()]
g["schedules"] = [dict(r) for r in conn.execute(
"SELECT * FROM schedule WHERE group_id = ? ORDER BY id",
(group_id,),
).fetchall()]
return g
def list_groups() -> list[dict]:
with get_conn() as conn:
groups = [dict(r) for r in conn.execute(
"SELECT * FROM grp ORDER BY name"
).fetchall()]
for g in groups:
full = get_group(g["id"])
g["members"] = full["members"] if full else []
g["schedules"] = full["schedules"] if full else []
return groups
def delete_group(group_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM grp WHERE id = ?", (group_id,))
def add_group_member(group_id: int, module_id: int, run_order: int = 0) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO group_member (group_id, module_id, run_order) "
"VALUES (?, ?, ?)",
(group_id, module_id, run_order),
)
return dict(conn.execute(
"SELECT * FROM group_member WHERE id = ?", (cur.lastrowid,)
).fetchone())
def remove_group_member(member_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM group_member WHERE id = ?", (member_id,))
# ---------------------------------------------------------------------------
# Schedules
# ---------------------------------------------------------------------------
def create_schedule(group_id: int, cron_expr: str, enabled: bool = True) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO schedule (group_id, cron_expr, enabled) VALUES (?, ?, ?)",
(group_id, cron_expr, int(enabled)),
)
return dict(conn.execute(
"SELECT * FROM schedule WHERE id = ?", (cur.lastrowid,)
).fetchone())
def get_schedule(schedule_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM schedule WHERE id = ?", (schedule_id,)
).fetchone()
return dict(row) if row else None
def list_schedules() -> list[dict]:
with get_conn() as conn:
return [dict(r) for r in conn.execute(
"SELECT s.*, g.name AS group_name FROM schedule s "
"JOIN grp g ON s.group_id = g.id ORDER BY g.name"
).fetchall()]
def update_schedule(schedule_id: int, **kwargs) -> dict:
allowed = {"cron_expr", "enabled"}
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
if not fields:
return get_schedule(schedule_id)
sets = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values())
with get_conn() as conn:
conn.execute(f"UPDATE schedule SET {sets} WHERE id = ?", vals + [schedule_id])
return get_schedule(schedule_id)
def delete_schedule(schedule_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM schedule WHERE id = ?", (schedule_id,))
# ---------------------------------------------------------------------------
# Group Runs
# ---------------------------------------------------------------------------
def create_group_run(group_id: int, triggered_by: str = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO group_run (group_id, triggered_by) VALUES (?, ?)",
(group_id, triggered_by),
)
return dict(conn.execute(
"SELECT * FROM group_run WHERE id = ?", (cur.lastrowid,)
).fetchone())
def finish_group_run(group_run_id: int, status: str):
with get_conn() as conn:
conn.execute(
"UPDATE group_run SET finished_at = datetime('now'), status = ? "
"WHERE id = ?",
(status, group_run_id),
)
def get_group_run(group_run_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM group_run WHERE id = ?", (group_run_id,)
).fetchone()
if not row:
return None
gr = dict(row)
gr["runs"] = [dict(r) for r in conn.execute(
"SELECT rl.*, m.name AS module_name FROM run_log rl "
"JOIN module m ON rl.module_id = m.id "
"WHERE rl.group_run_id = ? ORDER BY rl.id",
(group_run_id,),
).fetchall()]
return gr
def list_group_runs(group_id: int = None, limit: int = 50) -> list[dict]:
with get_conn() as conn:
if group_id:
return [dict(r) for r in conn.execute(
"SELECT gr.*, g.name AS group_name FROM group_run gr "
"JOIN grp g ON gr.group_id = g.id "
"WHERE gr.group_id = ? ORDER BY gr.id DESC LIMIT ?",
(group_id, limit),
).fetchall()]
return [dict(r) for r in conn.execute(
"SELECT gr.*, g.name AS group_name FROM group_run gr "
"JOIN grp g ON gr.group_id = g.id "
"ORDER BY gr.id DESC LIMIT ?", (limit,)
).fetchall()]
# ---------------------------------------------------------------------------
# Run Log
# ---------------------------------------------------------------------------
def create_run(module_id: int, group_run_id: int = None) -> dict:
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO run_log (module_id, group_run_id) VALUES (?, ?)",
(module_id, group_run_id),
)
return dict(conn.execute(
"SELECT * FROM run_log WHERE id = ?", (cur.lastrowid,)
).fetchone())
def log_run_sql(run_id: int, resolved_source_sql: str, merge_sql: str = None):
with get_conn() as conn:
conn.execute(
"UPDATE run_log SET resolved_source_sql = ?, merge_sql = ? WHERE id = ?",
(resolved_source_sql, merge_sql, run_id),
)
def log_run_output(run_id: int, jrunner_stdout: str = None,
jrunner_stderr: str = None, hook_log: str = None,
watermark_values_json: str = None):
sets, vals = [], []
if jrunner_stdout is not None:
sets.append("jrunner_stdout = ?"); vals.append(jrunner_stdout)
if jrunner_stderr is not None:
sets.append("jrunner_stderr = ?"); vals.append(jrunner_stderr)
if hook_log is not None:
sets.append("hook_log = ?"); vals.append(hook_log)
if watermark_values_json is not None:
sets.append("watermark_values_json = ?"); vals.append(watermark_values_json)
if not sets:
return
with get_conn() as conn:
conn.execute(
f"UPDATE run_log SET {', '.join(sets)} WHERE id = ?",
vals + [run_id],
)
def finish_run(run_id: int, status: str, row_count: int = None, error: str = None):
with get_conn() as conn:
conn.execute(
"UPDATE run_log SET finished_at = datetime('now'), status = ?, "
"row_count = ?, error = ? WHERE id = ?",
(status, row_count, error, run_id),
)
def get_run(run_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM run_log WHERE id = ?", (run_id,)).fetchone()
return dict(row) if row else None
def list_runs(module_id: int = None, status: str = None,
limit: int = 50) -> list[dict]:
with get_conn() as conn:
where, params = [], []
if module_id:
where.append("r.module_id = ?"); params.append(module_id)
if status:
where.append("r.status = ?"); params.append(status)
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
params.append(limit)
return [dict(r) for r in conn.execute(
f"SELECT r.*, m.name AS module_name FROM run_log r "
f"LEFT JOIN module m ON r.module_id = m.id "
f"{where_sql} ORDER BY r.id DESC LIMIT ?", params
).fetchall()]
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
def get_setting(key: str) -> str | None:
with get_conn() as conn:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
def set_setting(key: str, value: str):
with get_conn() as conn:
conn.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)