"""Sync runner — orchestrates jrunner transfers, staging, merge, hooks, logging.""" import json import os import re import subprocess import tempfile import logging from config import get_config from engine.db import ( get_module, get_connection, get_run, create_run, finish_run, log_run_sql, log_run_output, list_hooks, list_watermarks, get_group, acquire_module_lock, release_module_lock, create_group_run, finish_group_run, ) from engine.introspect import _resolve_password, fetch_columns, map_type_pg logger = logging.getLogger("pipekit.runner") class SyncError(Exception): pass def _parse_pg_jdbc_url(jdbc_url: str) -> dict: """Extract host, port, dbname from a PostgreSQL JDBC URL.""" m = re.match(r"jdbc:postgresql://([^:/]+)(?::(\d+))?/(\w+)", jdbc_url) if not m: return {} return {"host": m.group(1), "port": m.group(2) or "5432", "dbname": m.group(3)} def _run_dest_sql(conn_info: dict, sql: str) -> str: """Run SQL against a database connection. Uses psql for PostgreSQL (supports DDL/DML), jrunner query mode for others.""" password = _resolve_password(conn_info["password"]) with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(sql) sql_path = f.name try: if "postgresql" in conn_info["jdbc_url"].lower(): pg = _parse_pg_jdbc_url(conn_info["jdbc_url"]) env = os.environ.copy() env["PGPASSWORD"] = password result = subprocess.run( ["psql", "-h", pg.get("host", "localhost"), "-p", pg.get("port", "5432"), "-U", conn_info["username"] or "", "-d", pg.get("dbname", ""), "-f", sql_path], capture_output=True, text=True, timeout=300, env=env, ) if result.returncode != 0: raise SyncError(f"psql error: {result.stderr}") return result.stdout else: cfg = get_config() jrunner = cfg["jrunner_path"] result = subprocess.run( [jrunner, "-scu", conn_info["jdbc_url"], "-scn", conn_info["username"] or "", "-scp", password, "-sq", sql_path, "-f", "csv"], capture_output=True, text=True, timeout=300, ) return result.stdout finally: os.unlink(sql_path) def _run_jrunner_query(conn_info: dict, sql: str) -> str: """Run a query via jrunner query mode and return stdout.""" cfg = get_config() jrunner = cfg["jrunner_path"] password = _resolve_password(conn_info["password"]) with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(sql) sql_path = f.name try: result = subprocess.run( [jrunner, "-scu", conn_info["jdbc_url"], "-scn", conn_info["username"] or "", "-scp", password, "-sq", sql_path, "-f", "csv"], capture_output=True, text=True, timeout=60, ) if result.returncode != 0: raise SyncError(f"jrunner query error: {result.stderr or result.stdout}") return result.stdout finally: os.unlink(sql_path) def _staging_table_exists(dest_conn: dict, staging_table: str) -> bool: """Check if a staging table already exists in the destination.""" parts = staging_table.split(".") schema = parts[0] if len(parts) == 2 else "public" table = parts[-1] sql = ( f"SELECT 1 FROM information_schema.tables " f"WHERE table_schema = '{schema}' AND table_name = '{table}'" ) try: output = _run_dest_sql(dest_conn, sql).strip() return "1" in output except Exception: return False def _create_staging_from_source(source_conn: dict, dest_conn: dict, source_query: str, staging_table: str) -> None: """Ensure a staging table exists and is empty.""" if _staging_table_exists(dest_conn, staging_table): _run_dest_sql(dest_conn, f"TRUNCATE TABLE {staging_table};") return from engine.introspect import _detect_source_type source_type = _detect_source_type(source_conn["jdbc_url"]) base_query = source_query.rstrip().rstrip(";") if source_type == "sqlserver": probe_query = f"SELECT TOP 0 * FROM ({base_query}) AS probe0" elif source_type == "postgresql": probe_query = f"SELECT * FROM ({base_query}) AS probe0 LIMIT 0" elif source_type == "as400": probe_query = f"SELECT * FROM ({base_query}) AS probe0 FETCH FIRST 0 ROWS ONLY" else: probe_query = f"SELECT * FROM ({base_query}) AS probe0 WHERE 1=0" cfg = get_config() jrunner = cfg["jrunner_path"] src_pw = _resolve_password(source_conn["password"]) dst_pw = _resolve_password(dest_conn["password"]) with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(probe_query) sql_path = f.name try: result = subprocess.run( [jrunner, "-scu", source_conn["jdbc_url"], "-scn", source_conn["username"] or "", "-scp", src_pw, "-dcu", dest_conn["jdbc_url"], "-dcn", dest_conn["username"] or "", "-dcp", dst_pw, "-dt", staging_table, "-sq", sql_path], capture_output=True, text=True, timeout=30, ) output = result.stdout + result.stderr finally: os.unlink(sql_path) columns = [] for m in re.finditer(r"\*\s+(\S+):\s+(\S+)", output): col_name = m.group(1).lower() col_type = m.group(2) pg_type = map_type_pg(col_type) columns.append(f" {col_name:<30} {pg_type}") if not columns: raise SyncError(f"Could not introspect source columns. jrunner output: {output[:500]}") col_defs = ",\n".join(columns) ddl = ( f"DROP TABLE IF EXISTS {staging_table};\n" f"CREATE TABLE {staging_table} (\n{col_defs}\n);" ) _run_dest_sql(dest_conn, ddl) def _run_jdbc_transfer(source_conn: dict, dest_conn: dict, source_query: str, dest_table: str, on_output: callable = None) -> tuple[int, str, str]: """Run jrunner to transfer data from source to destination. Returns (row_count, stdout, stderr).""" cfg = get_config() jrunner = cfg["jrunner_path"] src_pw = _resolve_password(source_conn["password"]) dst_pw = _resolve_password(dest_conn["password"]) with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(source_query) sql_path = f.name try: proc = subprocess.Popen( [jrunner, "-scu", source_conn["jdbc_url"], "-scn", source_conn["username"] or "", "-scp", src_pw, "-dcu", dest_conn["jdbc_url"], "-dcn", dest_conn["username"] or "", "-dcp", dst_pw, "-dt", dest_table, "-sq", sql_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) stdout_lines = [] for line in proc.stdout: line = line.rstrip("\n") stdout_lines.append(line) if on_output: on_output(line) proc.wait() stdout = "\n".join(stdout_lines) stderr = proc.stderr.read() if proc.stderr else "" if proc.returncode != 0: raise SyncError(f"jrunner transfer failed: {stdout}\n{stderr}") row_count = _parse_row_count(stdout) return row_count, stdout, stderr finally: os.unlink(sql_path) def _parse_row_count(output: str) -> int: """Extract row count from jrunner output.""" for line in output.splitlines(): if "rows written" in line.lower(): m = re.search(r"(\d+)\s*rows written", line, re.IGNORECASE) if m: return int(m.group(1)) return 0 def _resolve_watermarks(module_id: int) -> dict[str, str]: """Resolve all watermarks for a module. Returns {name: resolved_value}.""" watermarks = list_watermarks(module_id) resolved = {} for wm in watermarks: conn = get_connection(wm["connection_id"]) if not conn: raise SyncError(f"Watermark '{wm['name']}' references missing connection {wm['connection_id']}") try: output = _run_jrunner_query(conn, wm["resolver_sql"]) # Take first row, first column lines = [l.strip() for l in output.strip().splitlines() if l.strip()] # Skip CSV header value = lines[1] if len(lines) > 1 else None if value: # Strip quotes if CSV-wrapped value = value.strip('"').strip("'") if not value or value.lower() == "null": value = wm["default_value"] resolved[wm["name"]] = value or "" except Exception as e: logger.warning(f"Watermark '{wm['name']}' resolver failed: {e}") if wm["default_value"]: resolved[wm["name"]] = wm["default_value"] else: raise SyncError( f"Watermark '{wm['name']}' resolver failed and no default: {e}" ) return resolved def _materialize_query(source_query: str, watermark_values: dict[str, str]) -> str: """Substitute {name} placeholders in source_query with resolved values.""" result = source_query for name, value in watermark_values.items(): result = result.replace(f"{{{name}}}", value) return result def preview_module(module_id: int) -> dict: """Preview the exact SQL that would be executed for a module.""" module = get_module(module_id) if not module: raise SyncError(f"Module {module_id} not found") dest_conn = get_connection(module["dest_connection_id"]) staging_table = f"pipekit_staging.{module['name']}" # Resolve watermarks and materialize query watermark_values = _resolve_watermarks(module_id) source_query = _materialize_query(module["source_query"], watermark_values) # Merge SQL merge_sql = _build_merge_sql(module, staging_table) # Hooks hooks = list_hooks(module_id) hook_sql = [] for h in hooks: if h["run_on"] in ("success", "always"): hook_sql.append(f"-- hook ({h['run_on']}): {h['sql']}") return { "source_query": source_query, "base_query": module["source_query"], "staging_table": staging_table, "merge_sql": merge_sql, "hooks": hook_sql, "strategy": module["merge_strategy"], "watermark_values": watermark_values, } def run_module(module_id: int, group_run_id: int = None, on_output: callable = None) -> dict: """Execute a single sync module. Returns the run log entry.""" module = get_module(module_id) if not module: raise SyncError(f"Module {module_id} not found") if not module["enabled"]: raise SyncError(f"Module {module['name']} is disabled") # Atomic lock acquisition pid = str(os.getpid()) if not acquire_module_lock(module_id, pid): raise SyncError(f"Module {module['name']} is already running") source_conn = get_connection(module["source_connection_id"]) dest_conn = get_connection(module["dest_connection_id"]) if not source_conn or not dest_conn: release_module_lock(module_id) raise SyncError("Source or destination connection not found") run = create_run(module_id, group_run_id) run_id = run["id"] staging_table = f"pipekit_staging.{module['name']}" logger.info(f"Starting sync: {module['name']} (run {run_id})") try: # 1. Resolve watermarks watermark_values = _resolve_watermarks(module_id) if watermark_values: log_run_output(run_id, watermark_values_json=json.dumps(watermark_values)) # 2. Materialize source query source_query = _materialize_query(module["source_query"], watermark_values) log_run_sql(run_id, source_query) # 3. Ensure schemas exist and create staging table if "postgresql" in dest_conn["jdbc_url"].lower(): dest_schema = module["dest_table"].split(".")[0] if "." in module["dest_table"] else "public" setup_sql = ( f"CREATE SCHEMA IF NOT EXISTS pipekit_staging;\n" f"CREATE SCHEMA IF NOT EXISTS {dest_schema};\n" ) _run_dest_sql(dest_conn, setup_sql) # 4. Create staging table from source metadata logger.info(f"Creating staging table {staging_table}") if on_output: on_output(f"Creating staging table {staging_table}") _create_staging_from_source(source_conn, dest_conn, module["source_query"], staging_table) # 5. Transfer data to staging table logger.info(f"Transferring data to {staging_table}") if on_output: on_output("Transferring data...") row_count, stdout, stderr = _run_jdbc_transfer( source_conn, dest_conn, source_query, staging_table, on_output=on_output ) log_run_output(run_id, jrunner_stdout=stdout, jrunner_stderr=stderr) if on_output: on_output(f"Transferred {row_count} rows") logger.info(f"Transferred {row_count} rows") # 6. Execute merge strategy merge_sql = _build_merge_sql(module, staging_table) log_run_sql(run_id, source_query, merge_sql) logger.info(f"Executing merge: {module['merge_strategy']}") if on_output: on_output(f"Executing merge: {module['merge_strategy']}") _run_dest_sql(dest_conn, merge_sql) # 7. Run success hooks hook_log = _run_hooks(module_id, "success", dest_conn) if hook_log: log_run_output(run_id, hook_log=hook_log) finish_run(run_id, "success", row_count) release_module_lock(module_id) logger.info(f"Sync complete: {module['name']} — {row_count} rows") return get_run(run_id) except Exception as e: error_msg = str(e) logger.error(f"Sync failed: {module['name']} — {error_msg}") # Run failure hooks try: hook_log = _run_hooks(module_id, "failure", dest_conn) if hook_log: log_run_output(run_id, hook_log=hook_log) except Exception: pass finish_run(run_id, "error", error=error_msg) release_module_lock(module_id) return get_run(run_id) def _build_merge_sql(module: dict, staging_table: str) -> str: """Build the merge SQL based on strategy.""" dest_table = module["dest_table"] strategy = module["merge_strategy"] merge_key = module["merge_key"] if strategy == "full": return ( f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n" f"BEGIN;\n" f"TRUNCATE TABLE {dest_table};\n" f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n" f"COMMIT;\n" ) elif strategy == "incremental": create_if = f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n" if merge_key: return ( f"{create_if}" f"BEGIN;\n" f"DELETE FROM {dest_table} WHERE {merge_key} IN " f"(SELECT DISTINCT {merge_key} FROM {staging_table});\n" f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n" f"COMMIT;\n" ) else: return f"{create_if}INSERT INTO {dest_table} SELECT * FROM {staging_table};\n" elif strategy == "append": return ( f"CREATE TABLE IF NOT EXISTS {dest_table} (LIKE {staging_table} INCLUDING ALL);\n" f"INSERT INTO {dest_table} SELECT * FROM {staging_table};\n" ) raise SyncError(f"Unknown merge strategy: {strategy}") def _run_hooks(module_id: int, run_on: str, dest_conn: dict) -> str: """Execute hooks for a module. Returns combined hook output log.""" hooks = list_hooks(module_id) log_parts = [] for hook in hooks: if hook["run_on"] == run_on or hook["run_on"] == "always": # Use hook's own connection if specified, otherwise dest if hook["connection_id"]: hook_conn = get_connection(hook["connection_id"]) if not hook_conn: log_parts.append(f"SKIP hook #{hook['id']}: connection {hook['connection_id']} not found") continue else: hook_conn = dest_conn logger.info(f"Running hook: {hook['sql'][:80]}") try: output = _run_dest_sql(hook_conn, hook["sql"]) log_parts.append(f"hook #{hook['id']} OK: {output[:200]}") except Exception as e: log_parts.append(f"hook #{hook['id']} FAILED: {e}") return "\n".join(log_parts) def run_group(group_id: int, triggered_by: str = "manual") -> dict: """Execute all modules in a group in order. Stops on first failure.""" group = get_group(group_id) if not group: raise SyncError(f"Group {group_id} not found") group_run = create_group_run(group_id, triggered_by=triggered_by) group_run_id = group_run["id"] final_status = "success" for member in group["members"]: run = run_module(member["module_id"], group_run_id=group_run_id) if run["status"] == "error": logger.error(f"Group {group['name']} stopped: {member['module_name']} failed") final_status = "error" break finish_group_run(group_run_id, final_status) return get_group_run(group_run_id)