pipekit/.archive/pre-rewrite/api/main.py
Paul Trowbridge 574ada5258 Initial commit: Pipekit rewrite.
Orchestration layer around the jrunner Java JDBC CLI, replacing the
previous shell-based sync system in .archive/pre-rewrite. Includes
the FastAPI + Jinja web frontend, per-driver adapters (DB2, MSSQL,
PG), wizard-driven module creation with editable dest types and
source-sourced table/column descriptions, watermark/hook CRUD,
and the engine that runs modules end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:38:26 -04:00

583 lines
20 KiB
Python

"""Pipekit API — FastAPI application."""
import os
import sys
import secrets
import queue
from typing import Optional
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from engine.db import (
init_db, clear_stale_locks,
# drivers
create_driver, get_driver, list_drivers, delete_driver,
# connections
create_connection, get_connection, list_connections, update_connection, delete_connection,
# modules
create_module, get_module, list_modules, update_module, delete_module,
# watermarks
create_watermark, get_watermark, list_watermarks, update_watermark, delete_watermark,
# hooks
create_hook, get_hook, list_hooks, update_hook, delete_hook,
# groups
create_group, get_group, list_groups, delete_group,
add_group_member, remove_group_member,
# schedules
create_schedule, get_schedule, list_schedules, update_schedule, delete_schedule,
# group runs
list_group_runs, get_group_run,
# runs
list_runs, get_run,
# settings
get_setting, set_setting,
)
from engine.runner import run_module, run_group, preview_module
from engine.introspect import fetch_tables, fetch_columns, propose_module
app = FastAPI(title="Pipekit", version="0.2.0", description="JDBC-based ETL orchestration")
security = HTTPBasic()
@app.on_event("startup")
def startup():
init_db()
clear_stale_locks()
if not get_setting("api_username"):
set_setting("api_username", "admin")
set_setting("api_password", "pipekit")
def authenticate(credentials: HTTPBasicCredentials = Depends(security)):
expected_user = get_setting("api_username") or "admin"
expected_pass = get_setting("api_password") or "pipekit"
if not (secrets.compare_digest(credentials.username, expected_user) and
secrets.compare_digest(credentials.password, expected_pass)):
raise HTTPException(status_code=401, detail="Invalid credentials")
return credentials.username
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
class DriverCreate(BaseModel):
name: str
jar_file: str
class_name: str
url_template: Optional[str] = None
class ConnectionCreate(BaseModel):
name: str
jdbc_url: str
driver_id: Optional[int] = None
username: Optional[str] = None
password: Optional[str] = None
default_dest_connection_id: Optional[int] = None
default_dest_schema: Optional[str] = None
notes: Optional[str] = None
class ConnectionUpdate(BaseModel):
name: Optional[str] = None
jdbc_url: Optional[str] = None
driver_id: Optional[int] = None
username: Optional[str] = None
password: Optional[str] = None
default_dest_connection_id: Optional[int] = None
default_dest_schema: Optional[str] = None
notes: Optional[str] = None
class ModuleCreate(BaseModel):
name: str
source_connection_id: int
dest_connection_id: int
dest_table: str
source_query: str
merge_strategy: str = "full"
merge_key: Optional[str] = None
class ModuleUpdate(BaseModel):
name: Optional[str] = None
source_connection_id: Optional[int] = None
dest_connection_id: Optional[int] = None
dest_table: Optional[str] = None
source_query: Optional[str] = None
merge_strategy: Optional[str] = None
merge_key: Optional[str] = None
enabled: Optional[bool] = None
class WatermarkCreate(BaseModel):
module_id: int
name: str
connection_id: int
resolver_sql: str
default_value: Optional[str] = None
class WatermarkUpdate(BaseModel):
name: Optional[str] = None
connection_id: Optional[int] = None
resolver_sql: Optional[str] = None
default_value: Optional[str] = None
class HookCreate(BaseModel):
module_id: int
sql: str
run_order: int = 0
connection_id: Optional[int] = None
run_on: str = "success"
class HookUpdate(BaseModel):
sql: Optional[str] = None
run_order: Optional[int] = None
connection_id: Optional[int] = None
run_on: Optional[str] = None
class GroupCreate(BaseModel):
name: str
class GroupMemberAdd(BaseModel):
module_id: int
run_order: int = 0
class ScheduleCreate(BaseModel):
group_id: int
cron_expr: str
enabled: bool = True
class ScheduleUpdate(BaseModel):
cron_expr: Optional[str] = None
enabled: Optional[bool] = None
class SettingUpdate(BaseModel):
value: str
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@app.get("/health")
def health():
return {"status": "ok"}
# ---------------------------------------------------------------------------
# Drivers
# ---------------------------------------------------------------------------
@app.get("/drivers")
def api_list_drivers(user: str = Depends(authenticate)):
return list_drivers()
@app.get("/drivers/{driver_id}")
def api_get_driver(driver_id: int, user: str = Depends(authenticate)):
d = get_driver(driver_id)
if not d:
raise HTTPException(404, "Driver not found")
return d
@app.post("/drivers", status_code=201)
def api_create_driver(body: DriverCreate, user: str = Depends(authenticate)):
return create_driver(**body.model_dump())
@app.post("/drivers/{driver_id}/delete")
def api_delete_driver(driver_id: int, user: str = Depends(authenticate)):
delete_driver(driver_id)
return {"ok": True}
# ---------------------------------------------------------------------------
# Connections
# ---------------------------------------------------------------------------
@app.get("/connections")
def api_list_connections(user: str = Depends(authenticate)):
return list_connections()
@app.get("/connections/{conn_id}")
def api_get_connection(conn_id: int, user: str = Depends(authenticate)):
c = get_connection(conn_id)
if not c:
raise HTTPException(404, "Connection not found")
return c
@app.post("/connections", status_code=201)
def api_create_connection(body: ConnectionCreate, user: str = Depends(authenticate)):
return create_connection(**body.model_dump())
@app.post("/connections/{conn_id}")
def api_update_connection(conn_id: int, body: ConnectionUpdate,
user: str = Depends(authenticate)):
return update_connection(conn_id, **body.model_dump(exclude_none=True))
@app.post("/connections/{conn_id}/delete")
def api_delete_connection(conn_id: int, user: str = Depends(authenticate)):
delete_connection(conn_id)
return {"ok": True}
@app.post("/connections/{conn_id}/test")
def api_test_connection(conn_id: int, user: str = Depends(authenticate)):
from engine.introspect import run_jrunner_query
import time
start = time.time()
try:
run_jrunner_query(conn_id, "SELECT 1")
elapsed = round(time.time() - start, 2)
return {"status": "ok", "elapsed_seconds": elapsed}
except Exception as e:
elapsed = round(time.time() - start, 2)
return {"status": "error", "detail": str(e), "elapsed_seconds": elapsed}
# ---------------------------------------------------------------------------
# Introspection
# ---------------------------------------------------------------------------
@app.post("/introspect/tables")
def api_introspect_tables(body: dict, user: str = Depends(authenticate)):
conn_id = body["connection_id"]
qualifiers = body.get("qualifiers", {})
schema = qualifiers.get("schema")
tables = fetch_tables(conn_id, schema_filter=schema)
return [t.to_dict() for t in tables]
@app.post("/introspect/columns")
def api_introspect_columns(body: dict, user: str = Depends(authenticate)):
conn_id = body["connection_id"]
table_name = body["table_name"]
qualifiers = body.get("qualifiers", {})
schema = qualifiers.get("schema", "")
columns = fetch_columns(conn_id, schema, table_name,
linked_server=qualifiers.get("linked_server"),
linked_db=qualifiers.get("linked_db"))
return [c.to_dict() for c in columns]
@app.post("/introspect/propose")
def api_introspect_propose(body: dict, user: str = Depends(authenticate)):
conn_id = body["connection_id"]
table_name = body["table_name"]
qualifiers = body.get("qualifiers", {})
schema = qualifiers.get("schema", "")
return propose_module(conn_id, schema, table_name,
dest_schema=qualifiers.get("dest_schema"),
linked_server=qualifiers.get("linked_server"),
linked_db=qualifiers.get("linked_db"))
# Keep old GET endpoints for backward compat with TUI
@app.get("/connections/{conn_id}/tables")
def api_list_tables(conn_id: int, schema: Optional[str] = None,
user: str = Depends(authenticate)):
tables = fetch_tables(conn_id, schema_filter=schema)
return [t.to_dict() for t in tables]
@app.get("/connections/{conn_id}/tables/{schema}.{table}/columns")
def api_list_columns(conn_id: int, schema: str, table: str,
user: str = Depends(authenticate)):
columns = fetch_columns(conn_id, schema, table)
return [c.to_dict() for c in columns]
@app.get("/connections/{conn_id}/tables/{schema}.{table}/propose")
def api_propose_module(conn_id: int, schema: str, table: str,
dest_schema: Optional[str] = None,
linked_server: Optional[str] = None,
linked_db: Optional[str] = None,
user: str = Depends(authenticate)):
return propose_module(conn_id, schema, table, dest_schema,
linked_server=linked_server, linked_db=linked_db)
# ---------------------------------------------------------------------------
# Modules
# ---------------------------------------------------------------------------
@app.get("/modules")
def api_list_modules(user: str = Depends(authenticate)):
return list_modules()
@app.get("/modules/{module_id}")
def api_get_module(module_id: int, user: str = Depends(authenticate)):
m = get_module(module_id)
if not m:
raise HTTPException(404, "Module not found")
return m
@app.post("/modules", status_code=201)
def api_create_module(body: ModuleCreate, user: str = Depends(authenticate)):
return create_module(**body.model_dump())
@app.post("/modules/{module_id}")
def api_update_module(module_id: int, body: ModuleUpdate,
user: str = Depends(authenticate)):
return update_module(module_id, **body.model_dump(exclude_none=True))
@app.post("/modules/{module_id}/delete")
def api_delete_module(module_id: int, user: str = Depends(authenticate)):
delete_module(module_id)
return {"ok": True}
@app.get("/modules/{module_id}/preview")
def api_preview_module(module_id: int, user: str = Depends(authenticate)):
return preview_module(module_id)
@app.get("/modules/{module_id}/columns")
def api_module_columns(module_id: int, user: str = Depends(authenticate)):
"""Parse source query and return column list."""
import re
module = get_module(module_id)
if not module:
raise HTTPException(404, "Module not found")
columns = []
for m in re.finditer(
r'(?:RTRIM\(([^)]+)\)|(\[?["\w#@$]+\]?(?:\.["\w#@$]+)*))\s+AS\s+(\w+)',
module["source_query"], re.IGNORECASE
):
columns.append({
"source": (m.group(1) or m.group(2)).strip(),
"alias": m.group(3),
"trimmed": bool(m.group(1)),
})
return columns
@app.post("/modules/{module_id}/run")
def api_run_module(module_id: int, user: str = Depends(authenticate)):
return run_module(module_id)
@app.get("/runs/{run_id}/stream")
def api_stream_run(run_id: int, user: str = Depends(authenticate)):
"""SSE stream for watching a run. Placeholder — full impl in async phase."""
raise HTTPException(501, "SSE streaming not yet implemented")
@app.post("/modules/{module_id}/run/stream")
def api_run_module_stream(module_id: int, user: str = Depends(authenticate)):
"""Trigger a sync run and stream jrunner output as text/event-stream."""
import threading, json
q = queue.Queue()
def on_output(line: str):
q.put(line)
def run_in_thread():
try:
result = run_module(module_id, on_output=on_output)
q.put(f"__DONE__{json.dumps(result)}")
except Exception as e:
q.put(f"__ERROR__{str(e)}")
threading.Thread(target=run_in_thread, daemon=True).start()
def event_stream():
while True:
try:
line = q.get(timeout=600)
except queue.Empty:
yield "data: __TIMEOUT__\n\n"
return
if line.startswith("__DONE__"):
yield f"data: {line}\n\n"
return
elif line.startswith("__ERROR__"):
yield f"data: {line}\n\n"
return
else:
yield f"data: {line}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/modules/{module_id}/runs")
def api_module_runs(module_id: int, limit: int = 50,
user: str = Depends(authenticate)):
return list_runs(module_id=module_id, limit=limit)
# Keep old path for TUI compat
@app.get("/modules/{module_id}/history")
def api_module_history(module_id: int, limit: int = 50,
user: str = Depends(authenticate)):
return list_runs(module_id=module_id, limit=limit)
# ---------------------------------------------------------------------------
# Watermarks
# ---------------------------------------------------------------------------
@app.get("/modules/{module_id}/watermarks")
def api_list_watermarks(module_id: int, user: str = Depends(authenticate)):
return list_watermarks(module_id)
@app.get("/watermarks/{watermark_id}")
def api_get_watermark(watermark_id: int, user: str = Depends(authenticate)):
w = get_watermark(watermark_id)
if not w:
raise HTTPException(404, "Watermark not found")
return w
@app.post("/watermarks", status_code=201)
def api_create_watermark(body: WatermarkCreate, user: str = Depends(authenticate)):
return create_watermark(**body.model_dump())
@app.post("/watermarks/{watermark_id}")
def api_update_watermark(watermark_id: int, body: WatermarkUpdate,
user: str = Depends(authenticate)):
return update_watermark(watermark_id, **body.model_dump(exclude_none=True))
@app.post("/watermarks/{watermark_id}/delete")
def api_delete_watermark(watermark_id: int, user: str = Depends(authenticate)):
delete_watermark(watermark_id)
return {"ok": True}
# ---------------------------------------------------------------------------
# Hooks
# ---------------------------------------------------------------------------
@app.get("/modules/{module_id}/hooks")
def api_list_hooks(module_id: int, user: str = Depends(authenticate)):
return list_hooks(module_id)
@app.get("/hooks/{hook_id}")
def api_get_hook(hook_id: int, user: str = Depends(authenticate)):
h = get_hook(hook_id)
if not h:
raise HTTPException(404, "Hook not found")
return h
@app.post("/hooks", status_code=201)
def api_create_hook(body: HookCreate, user: str = Depends(authenticate)):
return create_hook(**body.model_dump())
@app.post("/hooks/{hook_id}")
def api_update_hook(hook_id: int, body: HookUpdate,
user: str = Depends(authenticate)):
return update_hook(hook_id, **body.model_dump(exclude_none=True))
@app.post("/hooks/{hook_id}/delete")
def api_delete_hook(hook_id: int, user: str = Depends(authenticate)):
delete_hook(hook_id)
return {"ok": True}
# ---------------------------------------------------------------------------
# Groups
# ---------------------------------------------------------------------------
@app.get("/groups")
def api_list_groups(user: str = Depends(authenticate)):
return list_groups()
@app.get("/groups/{group_id}")
def api_get_group(group_id: int, user: str = Depends(authenticate)):
g = get_group(group_id)
if not g:
raise HTTPException(404, "Group not found")
return g
@app.post("/groups", status_code=201)
def api_create_group(body: GroupCreate, user: str = Depends(authenticate)):
return create_group(**body.model_dump())
@app.post("/groups/{group_id}/delete")
def api_delete_group(group_id: int, user: str = Depends(authenticate)):
delete_group(group_id)
return {"ok": True}
@app.post("/groups/{group_id}/members", status_code=201)
def api_add_member(group_id: int, body: GroupMemberAdd,
user: str = Depends(authenticate)):
return add_group_member(group_id, **body.model_dump())
@app.post("/groups/members/{member_id}/delete")
def api_remove_member(member_id: int, user: str = Depends(authenticate)):
remove_group_member(member_id)
return {"ok": True}
@app.post("/groups/{group_id}/run")
def api_run_group(group_id: int, user: str = Depends(authenticate)):
return run_group(group_id)
# ---------------------------------------------------------------------------
# Group Runs
# ---------------------------------------------------------------------------
@app.get("/group-runs")
def api_list_group_runs(group_id: Optional[int] = None, limit: int = 50,
user: str = Depends(authenticate)):
return list_group_runs(group_id=group_id, limit=limit)
@app.get("/group-runs/{group_run_id}")
def api_get_group_run(group_run_id: int, user: str = Depends(authenticate)):
gr = get_group_run(group_run_id)
if not gr:
raise HTTPException(404, "Group run not found")
return gr
# ---------------------------------------------------------------------------
# Runs
# ---------------------------------------------------------------------------
@app.get("/runs")
def api_list_runs(module_id: Optional[int] = None, status: Optional[str] = None,
limit: int = 50, user: str = Depends(authenticate)):
return list_runs(module_id=module_id, status=status, limit=limit)
@app.get("/runs/{run_id}")
def api_get_run(run_id: int, user: str = Depends(authenticate)):
r = get_run(run_id)
if not r:
raise HTTPException(404, "Run not found")
return r
# ---------------------------------------------------------------------------
# Schedules
# ---------------------------------------------------------------------------
@app.get("/schedules")
def api_list_schedules(user: str = Depends(authenticate)):
return list_schedules()
@app.get("/schedules/{schedule_id}")
def api_get_schedule(schedule_id: int, user: str = Depends(authenticate)):
s = get_schedule(schedule_id)
if not s:
raise HTTPException(404, "Schedule not found")
return s
@app.post("/schedules", status_code=201)
def api_create_schedule(body: ScheduleCreate, user: str = Depends(authenticate)):
return create_schedule(**body.model_dump())
@app.post("/schedules/{schedule_id}")
def api_update_schedule(schedule_id: int, body: ScheduleUpdate,
user: str = Depends(authenticate)):
return update_schedule(schedule_id, **body.model_dump(exclude_none=True))
@app.post("/schedules/{schedule_id}/delete")
def api_delete_schedule(schedule_id: int, user: str = Depends(authenticate)):
delete_schedule(schedule_id)
return {"ok": True}
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
@app.get("/settings")
def api_get_settings(user: str = Depends(authenticate)):
from engine.db import get_conn
with get_conn() as conn:
rows = conn.execute("SELECT key, value FROM settings ORDER BY key").fetchall()
return {r["key"]: r["value"] for r in rows}
@app.post("/settings/{key}")
def api_set_setting(key: str, body: SettingUpdate,
user: str = Depends(authenticate)):
set_setting(key, body.value)
return {"ok": True}