Orchestration layer around the jrunner Java JDBC CLI, replacing the previous shell-based sync system in .archive/pre-rewrite. Includes the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL, PG), wizard-driven module creation with editable dest types and source-sourced table/column descriptions, watermark/hook CRUD, and the engine that runs modules end-to-end. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
687 lines
24 KiB
Python
687 lines
24 KiB
Python
"""SQLite database layer for Pipekit."""
|
|
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
|
|
from config import get_config
|
|
|
|
SCHEMA_SQL = """
|
|
CREATE TABLE IF NOT EXISTS driver (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
jar_file TEXT NOT NULL,
|
|
class_name TEXT NOT NULL,
|
|
url_template TEXT,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS connection (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
driver_id INTEGER REFERENCES driver(id),
|
|
jdbc_url TEXT NOT NULL,
|
|
username TEXT,
|
|
password TEXT,
|
|
default_dest_connection_id INTEGER REFERENCES connection(id),
|
|
default_dest_schema TEXT,
|
|
notes TEXT,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS module (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
source_connection_id INTEGER NOT NULL REFERENCES connection(id),
|
|
dest_connection_id INTEGER NOT NULL REFERENCES connection(id),
|
|
dest_table TEXT NOT NULL,
|
|
source_query TEXT NOT NULL,
|
|
merge_strategy TEXT NOT NULL DEFAULT 'full',
|
|
merge_key TEXT,
|
|
enabled INTEGER DEFAULT 1,
|
|
running INTEGER DEFAULT 0,
|
|
running_pid TEXT,
|
|
running_since TEXT,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS watermark (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
|
|
name TEXT NOT NULL,
|
|
connection_id INTEGER NOT NULL REFERENCES connection(id),
|
|
resolver_sql TEXT NOT NULL,
|
|
default_value TEXT,
|
|
UNIQUE(module_id, name)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS hook (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
|
|
run_order INTEGER NOT NULL DEFAULT 0,
|
|
connection_id INTEGER REFERENCES connection(id),
|
|
sql TEXT NOT NULL,
|
|
run_on TEXT NOT NULL DEFAULT 'success'
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS grp (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS group_member (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
|
|
module_id INTEGER NOT NULL REFERENCES module(id) ON DELETE CASCADE,
|
|
run_order INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS schedule (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
group_id INTEGER NOT NULL REFERENCES grp(id) ON DELETE CASCADE,
|
|
cron_expr TEXT NOT NULL,
|
|
enabled INTEGER DEFAULT 1
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS group_run (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
group_id INTEGER NOT NULL REFERENCES grp(id),
|
|
started_at TEXT DEFAULT (datetime('now')),
|
|
finished_at TEXT,
|
|
status TEXT NOT NULL DEFAULT 'running',
|
|
triggered_by TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS run_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
module_id INTEGER NOT NULL REFERENCES module(id),
|
|
group_run_id INTEGER REFERENCES group_run(id),
|
|
started_at TEXT DEFAULT (datetime('now')),
|
|
finished_at TEXT,
|
|
row_count INTEGER,
|
|
status TEXT NOT NULL DEFAULT 'running',
|
|
error TEXT,
|
|
resolved_source_sql TEXT,
|
|
merge_sql TEXT,
|
|
watermark_values_json TEXT,
|
|
jrunner_stdout TEXT,
|
|
jrunner_stderr TEXT,
|
|
hook_log TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
);
|
|
"""
|
|
|
|
|
|
def get_db_path() -> str:
|
|
return get_config()["database"]
|
|
|
|
|
|
def init_db():
|
|
"""Create all tables if they don't exist."""
|
|
with get_conn() as conn:
|
|
conn.executescript(SCHEMA_SQL)
|
|
|
|
|
|
@contextmanager
|
|
def get_conn():
|
|
"""Get a SQLite connection with row_factory set."""
|
|
conn = sqlite3.connect(get_db_path())
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Drivers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_driver(name: str, jar_file: str, class_name: str,
|
|
url_template: str = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO driver (name, jar_file, class_name, url_template) "
|
|
"VALUES (?, ?, ?, ?)",
|
|
(name, jar_file, class_name, url_template),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM driver WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_driver(driver_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM driver WHERE id = ?", (driver_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_drivers() -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT * FROM driver ORDER BY name"
|
|
).fetchall()]
|
|
|
|
|
|
def delete_driver(driver_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM driver WHERE id = ?", (driver_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Connections
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_connection(name: str, jdbc_url: str, driver_id: int = None,
|
|
username: str = None, password: str = None,
|
|
default_dest_connection_id: int = None,
|
|
default_dest_schema: str = None,
|
|
notes: str = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO connection (name, jdbc_url, driver_id, username, password, "
|
|
"default_dest_connection_id, default_dest_schema, notes) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(name, jdbc_url, driver_id, username, password,
|
|
default_dest_connection_id, default_dest_schema, notes),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM connection WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_connection(conn_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM connection WHERE id = ?", (conn_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_connections() -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT * FROM connection ORDER BY name"
|
|
).fetchall()]
|
|
|
|
|
|
def update_connection(conn_id: int, **kwargs) -> dict:
|
|
allowed = {"name", "jdbc_url", "driver_id", "username", "password",
|
|
"default_dest_connection_id", "default_dest_schema", "notes"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
|
|
if not fields:
|
|
return get_connection(conn_id)
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values())
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE connection SET {sets}, updated_at = datetime('now') WHERE id = ?",
|
|
vals + [conn_id],
|
|
)
|
|
return get_connection(conn_id)
|
|
|
|
|
|
def delete_connection(conn_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM connection WHERE id = ?", (conn_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_module(name: str, source_connection_id: int, dest_connection_id: int,
|
|
dest_table: str, source_query: str, merge_strategy: str = "full",
|
|
merge_key: str = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO module (name, source_connection_id, dest_connection_id, "
|
|
"dest_table, source_query, merge_strategy, merge_key) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(name, source_connection_id, dest_connection_id, dest_table,
|
|
source_query, merge_strategy, merge_key),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM module WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_module(module_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM module WHERE id = ?", (module_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_modules() -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT * FROM module ORDER BY name"
|
|
).fetchall()]
|
|
|
|
|
|
def update_module(module_id: int, **kwargs) -> dict:
|
|
allowed = {"name", "source_connection_id", "dest_connection_id", "dest_table",
|
|
"source_query", "merge_strategy", "merge_key", "enabled"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
|
|
if not fields:
|
|
return get_module(module_id)
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values())
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE module SET {sets}, updated_at = datetime('now') WHERE id = ?",
|
|
vals + [module_id],
|
|
)
|
|
return get_module(module_id)
|
|
|
|
|
|
def acquire_module_lock(module_id: int, pid: str) -> bool:
|
|
"""Atomically acquire the run lock. Returns True if acquired."""
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE module SET running = 1, running_pid = ?, "
|
|
"running_since = datetime('now') "
|
|
"WHERE id = ? AND running = 0",
|
|
(pid, module_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def release_module_lock(module_id: int):
|
|
"""Release the run lock."""
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE module SET running = 0, running_pid = NULL, "
|
|
"running_since = NULL WHERE id = ?",
|
|
(module_id,),
|
|
)
|
|
|
|
|
|
def clear_stale_locks(max_age_hours: int = 24):
|
|
"""Clear locks held longer than max_age_hours or by dead PIDs."""
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE module SET running = 0, running_pid = NULL, running_since = NULL "
|
|
"WHERE running = 1 AND running_since < datetime('now', ?)",
|
|
(f"-{max_age_hours} hours",),
|
|
)
|
|
|
|
|
|
def delete_module(module_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM module WHERE id = ?", (module_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Watermarks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_watermark(module_id: int, name: str, connection_id: int,
|
|
resolver_sql: str, default_value: str = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO watermark (module_id, name, connection_id, resolver_sql, "
|
|
"default_value) VALUES (?, ?, ?, ?, ?)",
|
|
(module_id, name, connection_id, resolver_sql, default_value),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM watermark WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_watermark(watermark_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM watermark WHERE id = ?", (watermark_id,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_watermarks(module_id: int) -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT * FROM watermark WHERE module_id = ? ORDER BY name",
|
|
(module_id,),
|
|
).fetchall()]
|
|
|
|
|
|
def update_watermark(watermark_id: int, **kwargs) -> dict:
|
|
allowed = {"name", "connection_id", "resolver_sql", "default_value"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
|
|
if not fields:
|
|
return get_watermark(watermark_id)
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values())
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE watermark SET {sets} WHERE id = ?",
|
|
vals + [watermark_id],
|
|
)
|
|
return get_watermark(watermark_id)
|
|
|
|
|
|
def delete_watermark(watermark_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM watermark WHERE id = ?", (watermark_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_hook(module_id: int, sql: str, run_order: int = 0,
|
|
connection_id: int = None, run_on: str = "success") -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO hook (module_id, run_order, connection_id, sql, run_on) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(module_id, run_order, connection_id, sql, run_on),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM hook WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_hook(hook_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM hook WHERE id = ?", (hook_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_hooks(module_id: int) -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT * FROM hook WHERE module_id = ? ORDER BY run_order",
|
|
(module_id,),
|
|
).fetchall()]
|
|
|
|
|
|
def update_hook(hook_id: int, **kwargs) -> dict:
|
|
allowed = {"run_order", "connection_id", "sql", "run_on"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
|
|
if not fields:
|
|
return get_hook(hook_id)
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values())
|
|
with get_conn() as conn:
|
|
conn.execute(f"UPDATE hook SET {sets} WHERE id = ?", vals + [hook_id])
|
|
return get_hook(hook_id)
|
|
|
|
|
|
def delete_hook(hook_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM hook WHERE id = ?", (hook_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Groups
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_group(name: str) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute("INSERT INTO grp (name) VALUES (?)", (name,))
|
|
return dict(conn.execute(
|
|
"SELECT * FROM grp WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_group(group_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM grp WHERE id = ?", (group_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
g = dict(row)
|
|
g["members"] = [dict(r) for r in conn.execute(
|
|
"SELECT gm.*, m.name AS module_name FROM group_member gm "
|
|
"JOIN module m ON gm.module_id = m.id "
|
|
"WHERE gm.group_id = ? ORDER BY gm.run_order", (group_id,)
|
|
).fetchall()]
|
|
g["schedules"] = [dict(r) for r in conn.execute(
|
|
"SELECT * FROM schedule WHERE group_id = ? ORDER BY id",
|
|
(group_id,),
|
|
).fetchall()]
|
|
return g
|
|
|
|
|
|
def list_groups() -> list[dict]:
|
|
with get_conn() as conn:
|
|
groups = [dict(r) for r in conn.execute(
|
|
"SELECT * FROM grp ORDER BY name"
|
|
).fetchall()]
|
|
for g in groups:
|
|
full = get_group(g["id"])
|
|
g["members"] = full["members"] if full else []
|
|
g["schedules"] = full["schedules"] if full else []
|
|
return groups
|
|
|
|
|
|
def delete_group(group_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM grp WHERE id = ?", (group_id,))
|
|
|
|
|
|
def add_group_member(group_id: int, module_id: int, run_order: int = 0) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO group_member (group_id, module_id, run_order) "
|
|
"VALUES (?, ?, ?)",
|
|
(group_id, module_id, run_order),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM group_member WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def remove_group_member(member_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM group_member WHERE id = ?", (member_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schedules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_schedule(group_id: int, cron_expr: str, enabled: bool = True) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO schedule (group_id, cron_expr, enabled) VALUES (?, ?, ?)",
|
|
(group_id, cron_expr, int(enabled)),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM schedule WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def get_schedule(schedule_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM schedule WHERE id = ?", (schedule_id,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_schedules() -> list[dict]:
|
|
with get_conn() as conn:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT s.*, g.name AS group_name FROM schedule s "
|
|
"JOIN grp g ON s.group_id = g.id ORDER BY g.name"
|
|
).fetchall()]
|
|
|
|
|
|
def update_schedule(schedule_id: int, **kwargs) -> dict:
|
|
allowed = {"cron_expr", "enabled"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed and v is not None}
|
|
if not fields:
|
|
return get_schedule(schedule_id)
|
|
sets = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values())
|
|
with get_conn() as conn:
|
|
conn.execute(f"UPDATE schedule SET {sets} WHERE id = ?", vals + [schedule_id])
|
|
return get_schedule(schedule_id)
|
|
|
|
|
|
def delete_schedule(schedule_id: int):
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM schedule WHERE id = ?", (schedule_id,))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group Runs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_group_run(group_id: int, triggered_by: str = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO group_run (group_id, triggered_by) VALUES (?, ?)",
|
|
(group_id, triggered_by),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM group_run WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def finish_group_run(group_run_id: int, status: str):
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE group_run SET finished_at = datetime('now'), status = ? "
|
|
"WHERE id = ?",
|
|
(status, group_run_id),
|
|
)
|
|
|
|
|
|
def get_group_run(group_run_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM group_run WHERE id = ?", (group_run_id,)
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
gr = dict(row)
|
|
gr["runs"] = [dict(r) for r in conn.execute(
|
|
"SELECT rl.*, m.name AS module_name FROM run_log rl "
|
|
"JOIN module m ON rl.module_id = m.id "
|
|
"WHERE rl.group_run_id = ? ORDER BY rl.id",
|
|
(group_run_id,),
|
|
).fetchall()]
|
|
return gr
|
|
|
|
|
|
def list_group_runs(group_id: int = None, limit: int = 50) -> list[dict]:
|
|
with get_conn() as conn:
|
|
if group_id:
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT gr.*, g.name AS group_name FROM group_run gr "
|
|
"JOIN grp g ON gr.group_id = g.id "
|
|
"WHERE gr.group_id = ? ORDER BY gr.id DESC LIMIT ?",
|
|
(group_id, limit),
|
|
).fetchall()]
|
|
return [dict(r) for r in conn.execute(
|
|
"SELECT gr.*, g.name AS group_name FROM group_run gr "
|
|
"JOIN grp g ON gr.group_id = g.id "
|
|
"ORDER BY gr.id DESC LIMIT ?", (limit,)
|
|
).fetchall()]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run Log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_run(module_id: int, group_run_id: int = None) -> dict:
|
|
with get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO run_log (module_id, group_run_id) VALUES (?, ?)",
|
|
(module_id, group_run_id),
|
|
)
|
|
return dict(conn.execute(
|
|
"SELECT * FROM run_log WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone())
|
|
|
|
|
|
def log_run_sql(run_id: int, resolved_source_sql: str, merge_sql: str = None):
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE run_log SET resolved_source_sql = ?, merge_sql = ? WHERE id = ?",
|
|
(resolved_source_sql, merge_sql, run_id),
|
|
)
|
|
|
|
|
|
def log_run_output(run_id: int, jrunner_stdout: str = None,
|
|
jrunner_stderr: str = None, hook_log: str = None,
|
|
watermark_values_json: str = None):
|
|
sets, vals = [], []
|
|
if jrunner_stdout is not None:
|
|
sets.append("jrunner_stdout = ?"); vals.append(jrunner_stdout)
|
|
if jrunner_stderr is not None:
|
|
sets.append("jrunner_stderr = ?"); vals.append(jrunner_stderr)
|
|
if hook_log is not None:
|
|
sets.append("hook_log = ?"); vals.append(hook_log)
|
|
if watermark_values_json is not None:
|
|
sets.append("watermark_values_json = ?"); vals.append(watermark_values_json)
|
|
if not sets:
|
|
return
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE run_log SET {', '.join(sets)} WHERE id = ?",
|
|
vals + [run_id],
|
|
)
|
|
|
|
|
|
def finish_run(run_id: int, status: str, row_count: int = None, error: str = None):
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"UPDATE run_log SET finished_at = datetime('now'), status = ?, "
|
|
"row_count = ?, error = ? WHERE id = ?",
|
|
(status, row_count, error, run_id),
|
|
)
|
|
|
|
|
|
def get_run(run_id: int) -> dict | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM run_log WHERE id = ?", (run_id,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_runs(module_id: int = None, status: str = None,
|
|
limit: int = 50) -> list[dict]:
|
|
with get_conn() as conn:
|
|
where, params = [], []
|
|
if module_id:
|
|
where.append("r.module_id = ?"); params.append(module_id)
|
|
if status:
|
|
where.append("r.status = ?"); params.append(status)
|
|
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
|
|
params.append(limit)
|
|
return [dict(r) for r in conn.execute(
|
|
f"SELECT r.*, m.name AS module_name FROM run_log r "
|
|
f"LEFT JOIN module m ON r.module_id = m.id "
|
|
f"{where_sql} ORDER BY r.id DESC LIMIT ?", params
|
|
).fetchall()]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_setting(key: str) -> str | None:
|
|
with get_conn() as conn:
|
|
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else None
|
|
|
|
|
|
def set_setting(key: str, value: str):
|
|
with get_conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO settings (key, value) VALUES (?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
|
(key, value),
|
|
)
|