pipekit/pipekit/engine/runner.py
Paul Trowbridge 574ada5258 Initial commit: Pipekit rewrite.
Orchestration layer around the jrunner Java JDBC CLI, replacing the
previous shell-based sync system in .archive/pre-rewrite. Includes
the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL,
PG), wizard-driven module creation with editable dest types and
source-sourced table/column descriptions, watermark/hook CRUD,
and the engine that runs modules end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:38:26 -04:00

169 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Orchestrate one module run, per SPEC.md §"Engine flow".
Steps:
1. acquire lock atomically (repo.acquire_module_lock)
2. resolve watermarks (watermark.resolve_watermarks)
3. materialise source query, persist preview (watermark.materialise + repo)
4. ensure staging table exists on dest (CREATE TABLE IF NOT EXISTS ... LIKE dest)
5. jrunner migrate source → staging (jrunner.migrate — clears staging internally)
6. build merge SQL (merge.build_merge_sql)
7. run merge SQL on dest (jrunner.run_dest_sql)
8. run hooks in order, honouring run_on (jrunner.run_dest_sql)
9. write run_log row (repo.finish_run)
10. release lock (always) (repo.release_module_lock)
"""
from __future__ import annotations
import os
import traceback
from dataclasses import dataclass
from .. import jrunner, repo
from . import merge, watermark
@dataclass
class RunOutcome:
run_id: int
status: str # success | error | cancelled
row_count: int | None
error: str | None
resolved_source_sql: str | None
merge_sql: str | None
class LockBusy(RuntimeError):
"""Raised when a module is already running."""
def run_module(module_id: int, *, group_run_id: int | None = None,
dry_run: bool = False, run_id: int | None = None) -> RunOutcome:
"""Run one module end-to-end. In dry-run mode, SQL is generated and
stored on the run_log but no jrunner calls are made.
If ``run_id`` is provided, that run_log row is reused — this lets
async callers (the API) reserve a run_id before the run starts so
they can return it to the client immediately.
"""
module = repo.get_module(module_id)
if module is None:
raise ValueError(f"module id={module_id} not found")
if run_id is None:
run_id = repo.create_run(module_id, group_run_id=group_run_id)
lock_owner = f"{os.getpid()}:{run_id}"
if not repo.acquire_module_lock(module_id, lock_owner):
repo.finish_run(run_id, status="error", error="already running")
raise LockBusy(f"module {module['name']!r} is already running")
resolved_sql: str | None = None
merge_sql: str | None = None
row_count: int | None = None
status = "error"
error: str | None = None
try:
source_conn = repo.get_connection(module["source_connection_id"])
dest_conn = repo.get_connection(module["dest_connection_id"])
if source_conn is None or dest_conn is None:
raise ValueError("source or dest connection missing")
# 23. watermarks + materialised source query
wm_values = watermark.resolve_watermarks(module, use_defaults_only=dry_run)
resolved_sql = watermark.materialise(module["source_query"], wm_values)
repo.set_next_resolved_query(module_id, resolved_sql)
repo.log_run_sql(run_id, resolved_source_sql=resolved_sql,
watermark_values=wm_values)
# 6. merge SQL (built now so it's visible on run_log even if migrate fails)
merge_sql = merge.build_merge_sql(
strategy=module["merge_strategy"],
dest_table=module["dest_table"],
staging_table=module["staging_table"],
merge_key=module["merge_key"],
)
repo.log_run_sql(run_id, merge_sql=merge_sql)
if dry_run:
status = "success"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
# 4. ensure staging table exists on dest. Mirror the real dest schema
# so jrunner's auto-DELETE and the subsequent merge INSERT both find
# a table to work on. Idempotent — no-op after first run.
staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]:
jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);",
)
# 5. migrate source → staging. jrunner does its own `DELETE FROM staging`
# before loading, so we don't need a separate TRUNCATE.
migrate_result = jrunner.migrate(
source_conn=source_conn, dest_conn=dest_conn,
sql=resolved_sql, dest_table=module["staging_table"],
clear=False,
)
row_count = migrate_result.row_count
repo.log_run_output(run_id, jrunner_stdout=migrate_result.stdout,
jrunner_stderr=migrate_result.stderr)
# 7. merge
jrunner.run_dest_sql(dest_conn, merge_sql)
# 8. hooks (success path so far)
hook_log = _run_hooks(module_id, fail_fast=True, run_on_set={"success", "always"})
if hook_log:
repo.log_run_output(run_id, hook_log=hook_log)
status = "success"
return RunOutcome(run_id, status, row_count, None, resolved_sql, merge_sql)
except Exception as e: # noqa: BLE001
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
# Failure-path hooks, if any. Never let these mask the real error.
try:
hook_log = _run_hooks(module_id, fail_fast=False,
run_on_set={"failure", "always"})
if hook_log:
repo.log_run_output(run_id, hook_log=hook_log)
except Exception: # noqa: BLE001, S110
pass
return RunOutcome(run_id, "error", row_count, error, resolved_sql, merge_sql)
finally:
repo.finish_run(run_id, status=status, row_count=row_count, error=error)
repo.release_module_lock(module_id)
def _run_hooks(module_id: int, *, fail_fast: bool, run_on_set: set[str]) -> str:
"""Run hooks whose ``run_on`` is in run_on_set. Returns a text log."""
hooks = [h for h in repo.list_hooks(module_id) if h["run_on"] in run_on_set]
if not hooks:
return ""
lines: list[str] = []
for h in hooks:
conn = repo.get_connection(h["connection_id"]) if h["connection_id"] else None
target = conn["name"] if conn else f"connection id={h['connection_id']}"
lines.append(f"-- hook run_order={h['run_order']} on={h['run_on']} target={target}")
if conn is None:
lines.append(" SKIP: connection not found")
if fail_fast:
raise RuntimeError(f"hook connection {h['connection_id']} not found")
continue
try:
jrunner.run_dest_sql(conn, h["sql"])
lines.append(" OK")
except Exception as e: # noqa: BLE001
lines.append(f" ERROR: {e}")
if fail_fast:
raise
return "\n".join(lines)