pipekit/.archive/pre-rewrite/engine/runner.py
Paul Trowbridge 574ada5258 Initial commit: Pipekit rewrite.
Orchestration layer around the jrunner Java JDBC CLI, replacing the
previous shell-based sync system in .archive/pre-rewrite. Includes
the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL,
PG), wizard-driven module creation with editable dest types and
source-sourced table/column descriptions, watermark/hook CRUD,
and the engine that runs modules end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:38:26 -04:00

492 lines
18 KiB
Python

"""Sync runner — orchestrates jrunner transfers, staging, merge, hooks, logging."""
import json
import os
import re
import subprocess
import tempfile
import logging
from config import get_config
from engine.db import (
get_module, get_connection, get_run, create_run, finish_run,
log_run_sql, log_run_output, list_hooks, list_watermarks,
get_group, acquire_module_lock, release_module_lock,
create_group_run, finish_group_run,
)
from engine.introspect import _resolve_password, fetch_columns, map_type_pg
logger = logging.getLogger("pipekit.runner")
class SyncError(Exception):
pass
def _parse_pg_jdbc_url(jdbc_url: str) -> dict:
"""Extract host, port, dbname from a PostgreSQL JDBC URL."""
m = re.match(r"jdbc:postgresql://([^:/]+)(?::(\d+))?/(\w+)", jdbc_url)
if not m:
return {}
return {"host": m.group(1), "port": m.group(2) or "5432", "dbname": m.group(3)}
def _run_dest_sql(conn_info: dict, sql: str) -> str:
"""Run SQL against a database connection.
Uses psql for PostgreSQL (supports DDL/DML), jrunner query mode for others."""
password = _resolve_password(conn_info["password"])
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
f.write(sql)
sql_path = f.name
try:
if "postgresql" in conn_info["jdbc_url"].lower():
pg = _parse_pg_jdbc_url(conn_info["jdbc_url"])
env = os.environ.copy()
env["PGPASSWORD"] = password
result = subprocess.run(
["psql",
"-h", pg.get("host", "localhost"),
"-p", pg.get("port", "5432"),
"-U", conn_info["username"] or "",
"-d", pg.get("dbname", ""),
"-f", sql_path],
capture_output=True, text=True, timeout=300, env=env,
)
if result.returncode != 0:
raise SyncError(f"psql error: {result.stderr}")
return result.stdout
else:
cfg = get_config()
jrunner = cfg["jrunner_path"]
result = subprocess.run(
[jrunner,
"-scu", conn_info["jdbc_url"],
"-scn", conn_info["username"] or "",
"-scp", password,
"-sq", sql_path,
"-f", "csv"],
capture_output=True, text=True, timeout=300,
)
return result.stdout
finally:
os.unlink(sql_path)
def _run_jrunner_query(conn_info: dict, sql: str) -> str:
"""Run a query via jrunner query mode and return stdout."""
cfg = get_config()
jrunner = cfg["jrunner_path"]
password = _resolve_password(conn_info["password"])
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
f.write(sql)
sql_path = f.name
try:
result = subprocess.run(
[jrunner,
"-scu", conn_info["jdbc_url"],
"-scn", conn_info["username"] or "",
"-scp", password,
"-sq", sql_path,
"-f", "csv"],
capture_output=True, text=True, timeout=60,
)
if result.returncode != 0:
raise SyncError(f"jrunner query error: {result.stderr or result.stdout}")
return result.stdout
finally:
os.unlink(sql_path)
def _staging_table_exists(dest_conn: dict, staging_table: str) -> bool:
"""Check if a staging table already exists in the destination."""
parts = staging_table.split(".")
schema = parts[0] if len(parts) == 2 else "public"
table = parts[-1]
sql = (
f"SELECT 1 FROM information_schema.tables "
f"WHERE table_schema = '{schema}' AND table_name = '{table}'"
)
try:
output = _run_dest_sql(dest_conn, sql).strip()
return "1" in output
except Exception:
return False
def _create_staging_from_source(source_conn: dict, dest_conn: dict,
source_query: str, staging_table: str) -> None:
"""Ensure a staging table exists and is empty."""
if _staging_table_exists(dest_conn, staging_table):
_run_dest_sql(dest_conn, f"TRUNCATE TABLE {staging_table};")
return
from engine.introspect import _detect_source_type
source_type = _detect_source_type(source_conn["jdbc_url"])
base_query = source_query.rstrip().rstrip(";")
if source_type == "sqlserver":
probe_query = f"SELECT TOP 0 * FROM ({base_query}) AS probe0"
elif source_type == "postgresql":
probe_query = f"SELECT * FROM ({base_query}) AS probe0 LIMIT 0"
elif source_type == "as400":
probe_query = f"SELECT * FROM ({base_query}) AS probe0 FETCH FIRST 0 ROWS ONLY"
else:
probe_query = f"SELECT * FROM ({base_query}) AS probe0 WHERE 1=0"
cfg = get_config()
jrunner = cfg["jrunner_path"]
src_pw = _resolve_password(source_conn["password"])
dst_pw = _resolve_password(dest_conn["password"])
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
f.write(probe_query)
sql_path = f.name
try:
result = subprocess.run(
[jrunner,
"-scu", source_conn["jdbc_url"],
"-scn", source_conn["username"] or "",
"-scp", src_pw,
"-dcu", dest_conn["jdbc_url"],
"-dcn", dest_conn["username"] or "",
"-dcp", dst_pw,
"-dt", staging_table,
"-sq", sql_path],
capture_output=True, text=True, timeout=30,
)
output = result.stdout + result.stderr
finally:
os.unlink(sql_path)
columns = []
for m in re.finditer(r"\*\s+(\S+):\s+(\S+)", output):
col_name = m.group(1).lower()
col_type = m.group(2)
pg_type = map_type_pg(col_type)
columns.append(f" {col_name:<30} {pg_type}")
if not columns:
raise SyncError(f"Could not introspect source columns. jrunner output: {output[:500]}")
col_defs = ",\n".join(columns)
ddl = (
f"DROP TABLE IF EXISTS {staging_table};\n"
f"CREATE TABLE {staging_table} (\n{col_defs}\n);"
)
_run_dest_sql(dest_conn, ddl)
def _run_jdbc_transfer(source_conn: dict, dest_conn: dict, source_query: str,
dest_table: str, on_output: callable = None) -> tuple[int, str, str]:
"""Run jrunner to transfer data from source to destination.
Returns (row_count, stdout, stderr)."""
cfg = get_config()
jrunner = cfg["jrunner_path"]
src_pw = _resolve_password(source_conn["password"])
dst_pw = _resolve_password(dest_conn["password"])
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
f.write(source_query)
sql_path = f.name
try:
proc = subprocess.Popen(
[jrunner,
"-scu", source_conn["jdbc_url"],
"-scn", source_conn["username"] or "",
"-scp", src_pw,
"-dcu", dest_conn["jdbc_url"],
"-dcn", dest_conn["username"] or "",
"-dcp", dst_pw,
"-dt", dest_table,
"-sq", sql_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True,
)
stdout_lines = []
for line in proc.stdout:
line = line.rstrip("\n")
stdout_lines.append(line)
if on_output:
on_output(line)
proc.wait()
stdout = "\n".join(stdout_lines)
stderr = proc.stderr.read() if proc.stderr else ""
if proc.returncode != 0:
raise SyncError(f"jrunner transfer failed: {stdout}\n{stderr}")
row_count = _parse_row_count(stdout)
return row_count, stdout, stderr
finally:
os.unlink(sql_path)
def _parse_row_count(output: str) -> int:
"""Extract row count from jrunner output."""
for line in output.splitlines():
if "rows written" in line.lower():
m = re.search(r"(\d+)\s*rows written", line, re.IGNORECASE)
if m:
return int(m.group(1))
return 0
def _resolve_watermarks(module_id: int) -> dict[str, str]:
"""Resolve all watermarks for a module. Returns {name: resolved_value}."""
watermarks = list_watermarks(module_id)
resolved = {}
for wm in watermarks:
conn = get_connection(wm["connection_id"])
if not conn:
raise SyncError(f"Watermark '{wm['name']}' references missing connection {wm['connection_id']}")
try:
output = _run_jrunner_query(conn, wm["resolver_sql"])
# Take first row, first column
lines = [l.strip() for l in output.strip().splitlines() if l.strip()]
# Skip CSV header
value = lines[1] if len(lines) > 1 else None
if value:
# Strip quotes if CSV-wrapped
value = value.strip('"').strip("'")
if not value or value.lower() == "null":
value = wm["default_value"]
resolved[wm["name"]] = value or ""
except Exception as e:
logger.warning(f"Watermark '{wm['name']}' resolver failed: {e}")
if wm["default_value"]:
resolved[wm["name"]] = wm["default_value"]
else:
raise SyncError(
f"Watermark '{wm['name']}' resolver failed and no default: {e}"
)
return resolved
def _materialize_query(source_query: str, watermark_values: dict[str, str]) -> str:
"""Substitute {name} placeholders in source_query with resolved values."""
result = source_query
for name, value in watermark_values.items():
result = result.replace(f"{{{name}}}", value)
return result
def preview_module(module_id: int) -> dict:
"""Preview the exact SQL that would be executed for a module."""
module = get_module(module_id)
if not module:
raise SyncError(f"Module {module_id} not found")
dest_conn = get_connection(module["dest_connection_id"])
staging_table = f"pipekit_staging.{module['name']}"
# Resolve watermarks and materialize query
watermark_values = _resolve_watermarks(module_id)
source_query = _materialize_query(module["source_query"], watermark_values)
# Merge SQL
merge_sql = _build_merge_sql(module, staging_table)
# Hooks
hooks = list_hooks(module_id)
hook_sql = []
for h in hooks:
if h["run_on"] in ("success", "always"):
hook_sql.append(f"-- hook ({h['run_on']}): {h['sql']}")
return {
"source_query": source_query,
"base_query": module["source_query"],
"staging_table": staging_table,
"merge_sql": merge_sql,
"hooks": hook_sql,
"strategy": module["merge_strategy"],
"watermark_values": watermark_values,
}
def run_module(module_id: int, group_run_id: int = None,
on_output: callable = None) -> dict:
"""Execute a single sync module. Returns the run log entry."""
module = get_module(module_id)
if not module:
raise SyncError(f"Module {module_id} not found")
if not module["enabled"]:
raise SyncError(f"Module {module['name']} is disabled")
# Atomic lock acquisition
pid = str(os.getpid())
if not acquire_module_lock(module_id, pid):
raise SyncError(f"Module {module['name']} is already running")
source_conn = get_connection(module["source_connection_id"])
dest_conn = get_connection(module["dest_connection_id"])
if not source_conn or not dest_conn:
release_module_lock(module_id)
raise SyncError("Source or destination connection not found")
run = create_run(module_id, group_run_id)
run_id = run["id"]
staging_table = f"pipekit_staging.{module['name']}"
logger.info(f"Starting sync: {module['name']} (run {run_id})")
try:
# 1. Resolve watermarks
watermark_values = _resolve_watermarks(module_id)
if watermark_values:
log_run_output(run_id, watermark_values_json=json.dumps(watermark_values))
# 2. Materialize source query
source_query = _materialize_query(module["source_query"], watermark_values)
log_run_sql(run_id, source_query)
# 3. Ensure schemas exist and create staging table
if "postgresql" in dest_conn["jdbc_url"].lower():
dest_schema = module["dest_table"].split(".")[0] if "." in module["dest_table"] else "public"
setup_sql = (
f"CREATE SCHEMA IF NOT EXISTS pipekit_staging;\n"
f"CREATE SCHEMA IF NOT EXISTS {dest_schema};\n"
)
_run_dest_sql(dest_conn, setup_sql)
# 4. Create staging table from source metadata
logger.info(f"Creating staging table {staging_table}")
if on_output:
on_output(f"Creating staging table {staging_table}")
_create_staging_from_source(source_conn, dest_conn, module["source_query"], staging_table)
# 5. Transfer data to staging table
logger.info(f"Transferring data to {staging_table}")
if on_output:
on_output("Transferring data...")
row_count, stdout, stderr = _run_jdbc_transfer(
source_conn, dest_conn, source_query, staging_table, on_output=on_output
)
log_run_output(run_id, jrunner_stdout=stdout, jrunner_stderr=stderr)
if on_output:
on_output(f"Transferred {row_count} rows")
logger.info(f"Transferred {row_count} rows")
# 6. Execute merge strategy
merge_sql = _build_merge_sql(module, staging_table)
log_run_sql(run_id, source_query, merge_sql)
logger.info(f"Executing merge: {module['merge_strategy']}")
if on_output:
on_output(f"Executing merge: {module['merge_strategy']}")
_run_dest_sql(dest_conn, merge_sql)
# 7. Run success hooks
hook_log = _run_hooks(module_id, "success", dest_conn)
if hook_log:
log_run_output(run_id, hook_log=hook_log)
finish_run(run_id, "success", row_count)
release_module_lock(module_id)
logger.info(f"Sync complete: {module['name']}{row_count} rows")
return get_run(run_id)
except Exception as e:
error_msg = str(e)
logger.error(f"Sync failed: {module['name']}{error_msg}")
# Run failure hooks
try:
hook_log = _run_hooks(module_id, "failure", dest_conn)
if hook_log:
log_run_output(run_id, hook_log=hook_log)
except Exception:
pass
finish_run(run_id, "error", error=error_msg)
release_module_lock(module_id)
return get_run(run_id)
def _build_merge_sql(module: dict, staging_table: str) -> str:
"""Build the merge SQL based on strategy."""
dest_table = module["dest_table"]
strategy = module["merge_strategy"]
merge_key = module["merge_key"]
if strategy == "full":
return (
f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
f"BEGIN;\n"
f"TRUNCATE TABLE {dest_table};\n"
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
f"COMMIT;\n"
)
elif strategy == "incremental":
create_if = f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
if merge_key:
return (
f"{create_if}"
f"BEGIN;\n"
f"DELETE FROM {dest_table} WHERE {merge_key} IN "
f"(SELECT DISTINCT {merge_key} FROM {staging_table});\n"
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
f"COMMIT;\n"
)
else:
return f"{create_if}INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
elif strategy == "append":
return (
f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
)
raise SyncError(f"Unknown merge strategy: {strategy}")
def _run_hooks(module_id: int, run_on: str, dest_conn: dict) -> str:
"""Execute hooks for a module. Returns combined hook output log."""
hooks = list_hooks(module_id)
log_parts = []
for hook in hooks:
if hook["run_on"] == run_on or hook["run_on"] == "always":
# Use hook's own connection if specified, otherwise dest
if hook["connection_id"]:
hook_conn = get_connection(hook["connection_id"])
if not hook_conn:
log_parts.append(f"SKIP hook #{hook['id']}: connection {hook['connection_id']} not found")
continue
else:
hook_conn = dest_conn
logger.info(f"Running hook: {hook['sql'][:80]}")
try:
output = _run_dest_sql(hook_conn, hook["sql"])
log_parts.append(f"hook #{hook['id']} OK: {output[:200]}")
except Exception as e:
log_parts.append(f"hook #{hook['id']} FAILED: {e}")
return "\n".join(log_parts)
def run_group(group_id: int, triggered_by: str = "manual") -> dict:
"""Execute all modules in a group in order. Stops on first failure."""
group = get_group(group_id)
if not group:
raise SyncError(f"Group {group_id} not found")
group_run = create_group_run(group_id, triggered_by=triggered_by)
group_run_id = group_run["id"]
final_status = "success"
for member in group["members"]:
run = run_module(member["module_id"], group_run_id=group_run_id)
if run["status"] == "error":
logger.error(f"Group {group['name']} stopped: {member['module_name']} failed")
final_status = "error"
break
finish_group_run(group_run_id, final_status)
return get_group_run(group_run_id)