Orchestration layer around the jrunner Java JDBC CLI, replacing the previous shell-based sync system in .archive/pre-rewrite. Includes the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL, PG), wizard-driven module creation with editable dest types and source-sourced table/column descriptions, watermark/hook CRUD, and the engine that runs modules end-to-end. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
492 lines
18 KiB
Python
492 lines
18 KiB
Python
"""Sync runner — orchestrates jrunner transfers, staging, merge, hooks, logging."""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import logging
|
|
|
|
from config import get_config
|
|
from engine.db import (
|
|
get_module, get_connection, get_run, create_run, finish_run,
|
|
log_run_sql, log_run_output, list_hooks, list_watermarks,
|
|
get_group, acquire_module_lock, release_module_lock,
|
|
create_group_run, finish_group_run,
|
|
)
|
|
from engine.introspect import _resolve_password, fetch_columns, map_type_pg
|
|
|
|
logger = logging.getLogger("pipekit.runner")
|
|
|
|
|
|
class SyncError(Exception):
|
|
pass
|
|
|
|
|
|
def _parse_pg_jdbc_url(jdbc_url: str) -> dict:
|
|
"""Extract host, port, dbname from a PostgreSQL JDBC URL."""
|
|
m = re.match(r"jdbc:postgresql://([^:/]+)(?::(\d+))?/(\w+)", jdbc_url)
|
|
if not m:
|
|
return {}
|
|
return {"host": m.group(1), "port": m.group(2) or "5432", "dbname": m.group(3)}
|
|
|
|
|
|
def _run_dest_sql(conn_info: dict, sql: str) -> str:
|
|
"""Run SQL against a database connection.
|
|
Uses psql for PostgreSQL (supports DDL/DML), jrunner query mode for others."""
|
|
password = _resolve_password(conn_info["password"])
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
|
|
f.write(sql)
|
|
sql_path = f.name
|
|
|
|
try:
|
|
if "postgresql" in conn_info["jdbc_url"].lower():
|
|
pg = _parse_pg_jdbc_url(conn_info["jdbc_url"])
|
|
env = os.environ.copy()
|
|
env["PGPASSWORD"] = password
|
|
result = subprocess.run(
|
|
["psql",
|
|
"-h", pg.get("host", "localhost"),
|
|
"-p", pg.get("port", "5432"),
|
|
"-U", conn_info["username"] or "",
|
|
"-d", pg.get("dbname", ""),
|
|
"-f", sql_path],
|
|
capture_output=True, text=True, timeout=300, env=env,
|
|
)
|
|
if result.returncode != 0:
|
|
raise SyncError(f"psql error: {result.stderr}")
|
|
return result.stdout
|
|
else:
|
|
cfg = get_config()
|
|
jrunner = cfg["jrunner_path"]
|
|
result = subprocess.run(
|
|
[jrunner,
|
|
"-scu", conn_info["jdbc_url"],
|
|
"-scn", conn_info["username"] or "",
|
|
"-scp", password,
|
|
"-sq", sql_path,
|
|
"-f", "csv"],
|
|
capture_output=True, text=True, timeout=300,
|
|
)
|
|
return result.stdout
|
|
finally:
|
|
os.unlink(sql_path)
|
|
|
|
|
|
def _run_jrunner_query(conn_info: dict, sql: str) -> str:
|
|
"""Run a query via jrunner query mode and return stdout."""
|
|
cfg = get_config()
|
|
jrunner = cfg["jrunner_path"]
|
|
password = _resolve_password(conn_info["password"])
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
|
|
f.write(sql)
|
|
sql_path = f.name
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[jrunner,
|
|
"-scu", conn_info["jdbc_url"],
|
|
"-scn", conn_info["username"] or "",
|
|
"-scp", password,
|
|
"-sq", sql_path,
|
|
"-f", "csv"],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
if result.returncode != 0:
|
|
raise SyncError(f"jrunner query error: {result.stderr or result.stdout}")
|
|
return result.stdout
|
|
finally:
|
|
os.unlink(sql_path)
|
|
|
|
|
|
def _staging_table_exists(dest_conn: dict, staging_table: str) -> bool:
|
|
"""Check if a staging table already exists in the destination."""
|
|
parts = staging_table.split(".")
|
|
schema = parts[0] if len(parts) == 2 else "public"
|
|
table = parts[-1]
|
|
sql = (
|
|
f"SELECT 1 FROM information_schema.tables "
|
|
f"WHERE table_schema = '{schema}' AND table_name = '{table}'"
|
|
)
|
|
try:
|
|
output = _run_dest_sql(dest_conn, sql).strip()
|
|
return "1" in output
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _create_staging_from_source(source_conn: dict, dest_conn: dict,
|
|
source_query: str, staging_table: str) -> None:
|
|
"""Ensure a staging table exists and is empty."""
|
|
if _staging_table_exists(dest_conn, staging_table):
|
|
_run_dest_sql(dest_conn, f"TRUNCATE TABLE {staging_table};")
|
|
return
|
|
|
|
from engine.introspect import _detect_source_type
|
|
|
|
source_type = _detect_source_type(source_conn["jdbc_url"])
|
|
base_query = source_query.rstrip().rstrip(";")
|
|
|
|
if source_type == "sqlserver":
|
|
probe_query = f"SELECT TOP 0 * FROM ({base_query}) AS probe0"
|
|
elif source_type == "postgresql":
|
|
probe_query = f"SELECT * FROM ({base_query}) AS probe0 LIMIT 0"
|
|
elif source_type == "as400":
|
|
probe_query = f"SELECT * FROM ({base_query}) AS probe0 FETCH FIRST 0 ROWS ONLY"
|
|
else:
|
|
probe_query = f"SELECT * FROM ({base_query}) AS probe0 WHERE 1=0"
|
|
|
|
cfg = get_config()
|
|
jrunner = cfg["jrunner_path"]
|
|
src_pw = _resolve_password(source_conn["password"])
|
|
dst_pw = _resolve_password(dest_conn["password"])
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
|
|
f.write(probe_query)
|
|
sql_path = f.name
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[jrunner,
|
|
"-scu", source_conn["jdbc_url"],
|
|
"-scn", source_conn["username"] or "",
|
|
"-scp", src_pw,
|
|
"-dcu", dest_conn["jdbc_url"],
|
|
"-dcn", dest_conn["username"] or "",
|
|
"-dcp", dst_pw,
|
|
"-dt", staging_table,
|
|
"-sq", sql_path],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
output = result.stdout + result.stderr
|
|
finally:
|
|
os.unlink(sql_path)
|
|
|
|
columns = []
|
|
for m in re.finditer(r"\*\s+(\S+):\s+(\S+)", output):
|
|
col_name = m.group(1).lower()
|
|
col_type = m.group(2)
|
|
pg_type = map_type_pg(col_type)
|
|
columns.append(f" {col_name:<30} {pg_type}")
|
|
|
|
if not columns:
|
|
raise SyncError(f"Could not introspect source columns. jrunner output: {output[:500]}")
|
|
|
|
col_defs = ",\n".join(columns)
|
|
ddl = (
|
|
f"DROP TABLE IF EXISTS {staging_table};\n"
|
|
f"CREATE TABLE {staging_table} (\n{col_defs}\n);"
|
|
)
|
|
_run_dest_sql(dest_conn, ddl)
|
|
|
|
|
|
def _run_jdbc_transfer(source_conn: dict, dest_conn: dict, source_query: str,
|
|
dest_table: str, on_output: callable = None) -> tuple[int, str, str]:
|
|
"""Run jrunner to transfer data from source to destination.
|
|
Returns (row_count, stdout, stderr)."""
|
|
cfg = get_config()
|
|
jrunner = cfg["jrunner_path"]
|
|
src_pw = _resolve_password(source_conn["password"])
|
|
dst_pw = _resolve_password(dest_conn["password"])
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f:
|
|
f.write(source_query)
|
|
sql_path = f.name
|
|
|
|
try:
|
|
proc = subprocess.Popen(
|
|
[jrunner,
|
|
"-scu", source_conn["jdbc_url"],
|
|
"-scn", source_conn["username"] or "",
|
|
"-scp", src_pw,
|
|
"-dcu", dest_conn["jdbc_url"],
|
|
"-dcn", dest_conn["username"] or "",
|
|
"-dcp", dst_pw,
|
|
"-dt", dest_table,
|
|
"-sq", sql_path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
stdout_lines = []
|
|
for line in proc.stdout:
|
|
line = line.rstrip("\n")
|
|
stdout_lines.append(line)
|
|
if on_output:
|
|
on_output(line)
|
|
proc.wait()
|
|
stdout = "\n".join(stdout_lines)
|
|
stderr = proc.stderr.read() if proc.stderr else ""
|
|
|
|
if proc.returncode != 0:
|
|
raise SyncError(f"jrunner transfer failed: {stdout}\n{stderr}")
|
|
|
|
row_count = _parse_row_count(stdout)
|
|
return row_count, stdout, stderr
|
|
finally:
|
|
os.unlink(sql_path)
|
|
|
|
|
|
def _parse_row_count(output: str) -> int:
|
|
"""Extract row count from jrunner output."""
|
|
for line in output.splitlines():
|
|
if "rows written" in line.lower():
|
|
m = re.search(r"(\d+)\s*rows written", line, re.IGNORECASE)
|
|
if m:
|
|
return int(m.group(1))
|
|
return 0
|
|
|
|
|
|
def _resolve_watermarks(module_id: int) -> dict[str, str]:
|
|
"""Resolve all watermarks for a module. Returns {name: resolved_value}."""
|
|
watermarks = list_watermarks(module_id)
|
|
resolved = {}
|
|
for wm in watermarks:
|
|
conn = get_connection(wm["connection_id"])
|
|
if not conn:
|
|
raise SyncError(f"Watermark '{wm['name']}' references missing connection {wm['connection_id']}")
|
|
try:
|
|
output = _run_jrunner_query(conn, wm["resolver_sql"])
|
|
# Take first row, first column
|
|
lines = [l.strip() for l in output.strip().splitlines() if l.strip()]
|
|
# Skip CSV header
|
|
value = lines[1] if len(lines) > 1 else None
|
|
if value:
|
|
# Strip quotes if CSV-wrapped
|
|
value = value.strip('"').strip("'")
|
|
if not value or value.lower() == "null":
|
|
value = wm["default_value"]
|
|
resolved[wm["name"]] = value or ""
|
|
except Exception as e:
|
|
logger.warning(f"Watermark '{wm['name']}' resolver failed: {e}")
|
|
if wm["default_value"]:
|
|
resolved[wm["name"]] = wm["default_value"]
|
|
else:
|
|
raise SyncError(
|
|
f"Watermark '{wm['name']}' resolver failed and no default: {e}"
|
|
)
|
|
return resolved
|
|
|
|
|
|
def _materialize_query(source_query: str, watermark_values: dict[str, str]) -> str:
|
|
"""Substitute {name} placeholders in source_query with resolved values."""
|
|
result = source_query
|
|
for name, value in watermark_values.items():
|
|
result = result.replace(f"{{{name}}}", value)
|
|
return result
|
|
|
|
|
|
def preview_module(module_id: int) -> dict:
|
|
"""Preview the exact SQL that would be executed for a module."""
|
|
module = get_module(module_id)
|
|
if not module:
|
|
raise SyncError(f"Module {module_id} not found")
|
|
|
|
dest_conn = get_connection(module["dest_connection_id"])
|
|
staging_table = f"pipekit_staging.{module['name']}"
|
|
|
|
# Resolve watermarks and materialize query
|
|
watermark_values = _resolve_watermarks(module_id)
|
|
source_query = _materialize_query(module["source_query"], watermark_values)
|
|
|
|
# Merge SQL
|
|
merge_sql = _build_merge_sql(module, staging_table)
|
|
|
|
# Hooks
|
|
hooks = list_hooks(module_id)
|
|
hook_sql = []
|
|
for h in hooks:
|
|
if h["run_on"] in ("success", "always"):
|
|
hook_sql.append(f"-- hook ({h['run_on']}): {h['sql']}")
|
|
|
|
return {
|
|
"source_query": source_query,
|
|
"base_query": module["source_query"],
|
|
"staging_table": staging_table,
|
|
"merge_sql": merge_sql,
|
|
"hooks": hook_sql,
|
|
"strategy": module["merge_strategy"],
|
|
"watermark_values": watermark_values,
|
|
}
|
|
|
|
|
|
def run_module(module_id: int, group_run_id: int = None,
|
|
on_output: callable = None) -> dict:
|
|
"""Execute a single sync module. Returns the run log entry."""
|
|
module = get_module(module_id)
|
|
if not module:
|
|
raise SyncError(f"Module {module_id} not found")
|
|
if not module["enabled"]:
|
|
raise SyncError(f"Module {module['name']} is disabled")
|
|
|
|
# Atomic lock acquisition
|
|
pid = str(os.getpid())
|
|
if not acquire_module_lock(module_id, pid):
|
|
raise SyncError(f"Module {module['name']} is already running")
|
|
|
|
source_conn = get_connection(module["source_connection_id"])
|
|
dest_conn = get_connection(module["dest_connection_id"])
|
|
if not source_conn or not dest_conn:
|
|
release_module_lock(module_id)
|
|
raise SyncError("Source or destination connection not found")
|
|
|
|
run = create_run(module_id, group_run_id)
|
|
run_id = run["id"]
|
|
staging_table = f"pipekit_staging.{module['name']}"
|
|
|
|
logger.info(f"Starting sync: {module['name']} (run {run_id})")
|
|
|
|
try:
|
|
# 1. Resolve watermarks
|
|
watermark_values = _resolve_watermarks(module_id)
|
|
if watermark_values:
|
|
log_run_output(run_id, watermark_values_json=json.dumps(watermark_values))
|
|
|
|
# 2. Materialize source query
|
|
source_query = _materialize_query(module["source_query"], watermark_values)
|
|
log_run_sql(run_id, source_query)
|
|
|
|
# 3. Ensure schemas exist and create staging table
|
|
if "postgresql" in dest_conn["jdbc_url"].lower():
|
|
dest_schema = module["dest_table"].split(".")[0] if "." in module["dest_table"] else "public"
|
|
setup_sql = (
|
|
f"CREATE SCHEMA IF NOT EXISTS pipekit_staging;\n"
|
|
f"CREATE SCHEMA IF NOT EXISTS {dest_schema};\n"
|
|
)
|
|
_run_dest_sql(dest_conn, setup_sql)
|
|
|
|
# 4. Create staging table from source metadata
|
|
logger.info(f"Creating staging table {staging_table}")
|
|
if on_output:
|
|
on_output(f"Creating staging table {staging_table}")
|
|
_create_staging_from_source(source_conn, dest_conn, module["source_query"], staging_table)
|
|
|
|
# 5. Transfer data to staging table
|
|
logger.info(f"Transferring data to {staging_table}")
|
|
if on_output:
|
|
on_output("Transferring data...")
|
|
row_count, stdout, stderr = _run_jdbc_transfer(
|
|
source_conn, dest_conn, source_query, staging_table, on_output=on_output
|
|
)
|
|
log_run_output(run_id, jrunner_stdout=stdout, jrunner_stderr=stderr)
|
|
if on_output:
|
|
on_output(f"Transferred {row_count} rows")
|
|
logger.info(f"Transferred {row_count} rows")
|
|
|
|
# 6. Execute merge strategy
|
|
merge_sql = _build_merge_sql(module, staging_table)
|
|
log_run_sql(run_id, source_query, merge_sql)
|
|
logger.info(f"Executing merge: {module['merge_strategy']}")
|
|
if on_output:
|
|
on_output(f"Executing merge: {module['merge_strategy']}")
|
|
_run_dest_sql(dest_conn, merge_sql)
|
|
|
|
# 7. Run success hooks
|
|
hook_log = _run_hooks(module_id, "success", dest_conn)
|
|
if hook_log:
|
|
log_run_output(run_id, hook_log=hook_log)
|
|
|
|
finish_run(run_id, "success", row_count)
|
|
release_module_lock(module_id)
|
|
logger.info(f"Sync complete: {module['name']} — {row_count} rows")
|
|
return get_run(run_id)
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"Sync failed: {module['name']} — {error_msg}")
|
|
|
|
# Run failure hooks
|
|
try:
|
|
hook_log = _run_hooks(module_id, "failure", dest_conn)
|
|
if hook_log:
|
|
log_run_output(run_id, hook_log=hook_log)
|
|
except Exception:
|
|
pass
|
|
|
|
finish_run(run_id, "error", error=error_msg)
|
|
release_module_lock(module_id)
|
|
return get_run(run_id)
|
|
|
|
|
|
def _build_merge_sql(module: dict, staging_table: str) -> str:
|
|
"""Build the merge SQL based on strategy."""
|
|
dest_table = module["dest_table"]
|
|
strategy = module["merge_strategy"]
|
|
merge_key = module["merge_key"]
|
|
|
|
if strategy == "full":
|
|
return (
|
|
f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
|
|
f"BEGIN;\n"
|
|
f"TRUNCATE TABLE {dest_table};\n"
|
|
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
|
|
f"COMMIT;\n"
|
|
)
|
|
|
|
elif strategy == "incremental":
|
|
create_if = f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
|
|
if merge_key:
|
|
return (
|
|
f"{create_if}"
|
|
f"BEGIN;\n"
|
|
f"DELETE FROM {dest_table} WHERE {merge_key} IN "
|
|
f"(SELECT DISTINCT {merge_key} FROM {staging_table});\n"
|
|
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
|
|
f"COMMIT;\n"
|
|
)
|
|
else:
|
|
return f"{create_if}INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
|
|
|
|
elif strategy == "append":
|
|
return (
|
|
f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n"
|
|
f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n"
|
|
)
|
|
|
|
raise SyncError(f"Unknown merge strategy: {strategy}")
|
|
|
|
|
|
def _run_hooks(module_id: int, run_on: str, dest_conn: dict) -> str:
|
|
"""Execute hooks for a module. Returns combined hook output log."""
|
|
hooks = list_hooks(module_id)
|
|
log_parts = []
|
|
for hook in hooks:
|
|
if hook["run_on"] == run_on or hook["run_on"] == "always":
|
|
# Use hook's own connection if specified, otherwise dest
|
|
if hook["connection_id"]:
|
|
hook_conn = get_connection(hook["connection_id"])
|
|
if not hook_conn:
|
|
log_parts.append(f"SKIP hook #{hook['id']}: connection {hook['connection_id']} not found")
|
|
continue
|
|
else:
|
|
hook_conn = dest_conn
|
|
logger.info(f"Running hook: {hook['sql'][:80]}")
|
|
try:
|
|
output = _run_dest_sql(hook_conn, hook["sql"])
|
|
log_parts.append(f"hook #{hook['id']} OK: {output[:200]}")
|
|
except Exception as e:
|
|
log_parts.append(f"hook #{hook['id']} FAILED: {e}")
|
|
return "\n".join(log_parts)
|
|
|
|
|
|
def run_group(group_id: int, triggered_by: str = "manual") -> dict:
|
|
"""Execute all modules in a group in order. Stops on first failure."""
|
|
group = get_group(group_id)
|
|
if not group:
|
|
raise SyncError(f"Group {group_id} not found")
|
|
|
|
group_run = create_group_run(group_id, triggered_by=triggered_by)
|
|
group_run_id = group_run["id"]
|
|
final_status = "success"
|
|
|
|
for member in group["members"]:
|
|
run = run_module(member["module_id"], group_run_id=group_run_id)
|
|
if run["status"] == "error":
|
|
logger.error(f"Group {group['name']} stopped: {member['module_name']} failed")
|
|
final_status = "error"
|
|
break
|
|
|
|
finish_group_run(group_run_id, final_status)
|
|
return get_group_run(group_run_id)
|