diff --git a/pipekit/engine/__init__.py b/pipekit/engine/__init__.py index 7000c55..0882ccc 100644 --- a/pipekit/engine/__init__.py +++ b/pipekit/engine/__init__.py @@ -1,3 +1,3 @@ -from .runner import LockBusy, RunOutcome, run_module +from .runner import GroupRunOutcome, LockBusy, RunOutcome, run_group, run_module -__all__ = ["LockBusy", "RunOutcome", "run_module"] +__all__ = ["GroupRunOutcome", "LockBusy", "RunOutcome", "run_group", "run_module"] diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index 1e49f2d..85c20f2 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -33,6 +33,13 @@ class RunOutcome: merge_sql: str | None +@dataclass +class GroupRunOutcome: + group_run_id: int + status: str # success | error | dry_run + module_outcomes: list + + class LockBusy(RuntimeError): """Raised when a module is already running.""" @@ -150,6 +157,49 @@ def run_module(module_id: int, *, group_run_id: int | None = None, repo.release_module_lock(module_id) +def run_group(group_id: int, *, dry_run: bool = False, + group_run_id: int | None = None) -> GroupRunOutcome: + """Run all enabled members of a group sequentially in run_order. + + Continues past individual module failures so partial progress is visible. + Final status: dry_run if all dry_run; error if any errored; success otherwise. + """ + grp = repo.get_group(group_id) + if grp is None: + raise ValueError(f"group id={group_id} not found") + + if group_run_id is None: + group_run_id = repo.create_group_run(group_id, triggered_by="manual") + + members = [m for m in repo.list_group_members(group_id) if m["module_enabled"]] + outcomes: list[RunOutcome] = [] + + for member in members: + try: + outcome = run_module( + member["module_id"], + group_run_id=group_run_id, + dry_run=dry_run, + ) + except LockBusy as e: + run_id = repo.create_run(member["module_id"], group_run_id=group_run_id) + repo.finish_run(run_id, status="error", error=str(e)) + outcome = RunOutcome(run_id, "error", None, str(e), None, None) + outcomes.append(outcome) + + if not outcomes: + final = "dry_run" if dry_run else "success" + elif all(o.status == "dry_run" for o in outcomes): + final = "dry_run" + elif any(o.status == "error" for o in outcomes): + final = "error" + else: + final = "success" + + repo.finish_group_run(group_run_id, status=final) + return GroupRunOutcome(group_run_id, final, outcomes) + + def _run_hooks(module_id: int, *, fail_fast: bool, run_on_set: set[str]) -> str: """Run hooks whose ``run_on`` is in run_on_set. Returns a text log.""" hooks = [h for h in repo.list_hooks(module_id) if h["run_on"] in run_on_set] diff --git a/pipekit/repo.py b/pipekit/repo.py index 4dc26f9..18a8e4f 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -482,6 +482,144 @@ def set_setting(key: str, value: str) -> None: ) +# --------------------------------------------------------------------------- +# Groups +# --------------------------------------------------------------------------- + +def create_group(*, name: str) -> dict: + with db.connect() as c: + cur = c.execute("INSERT INTO grp (name) VALUES (?)", (name,)) + return _row(c.execute("SELECT * FROM grp WHERE id=?", (cur.lastrowid,)).fetchone()) + + +def get_group(group_id: int) -> dict | None: + with db.connect() as c: + return _row(c.execute("SELECT * FROM grp WHERE id=?", (group_id,)).fetchone()) + + +def get_group_by_name(name: str) -> dict | None: + with db.connect() as c: + return _row(c.execute("SELECT * FROM grp WHERE name=?", (name,)).fetchone()) + + +def list_groups() -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute("SELECT * FROM grp ORDER BY name")] + + +def update_group(group_id: int, *, name: str) -> dict | None: + with db.connect() as c: + c.execute("UPDATE grp SET name=? WHERE id=?", (name, group_id)) + return get_group(group_id) + + +def delete_group(group_id: int) -> bool: + with db.connect() as c: + cur = c.execute("DELETE FROM grp WHERE id=?", (group_id,)) + return cur.rowcount > 0 + + +# --------------------------------------------------------------------------- +# Group members +# --------------------------------------------------------------------------- + +def list_group_members(group_id: int) -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute( + "SELECT gm.*, m.name AS module_name, m.enabled AS module_enabled " + "FROM group_member gm " + "JOIN module m ON gm.module_id=m.id " + "WHERE gm.group_id=? " + "ORDER BY gm.run_order, m.name", + (group_id,))] + + +def module_group_map() -> dict[int, list[dict]]: + """Return {module_id: [{group_id, group_name}, ...]} for all members.""" + with db.connect() as c: + rows = c.execute( + "SELECT gm.module_id, g.id AS group_id, g.name AS group_name " + "FROM group_member gm " + "JOIN grp g ON gm.group_id=g.id " + "ORDER BY g.name" + ).fetchall() + result: dict[int, list[dict]] = {} + for r in rows: + result.setdefault(r["module_id"], []).append( + {"group_id": r["group_id"], "group_name": r["group_name"]} + ) + return result + + +def set_group_members(group_id: int, members: list[dict]) -> None: + """Replace all members for a group. members: list of {module_id, run_order}.""" + with db.connect() as c: + c.execute("DELETE FROM group_member WHERE group_id=?", (group_id,)) + for m in members: + c.execute( + "INSERT INTO group_member (group_id, module_id, run_order) " + "VALUES (?, ?, ?)", + (group_id, m["module_id"], m.get("run_order", 0)), + ) + + +# --------------------------------------------------------------------------- +# Group runs +# --------------------------------------------------------------------------- + +def create_group_run(group_id: int, *, triggered_by: str | None = None) -> int: + with db.connect() as c: + cur = c.execute( + "INSERT INTO group_run (group_id, triggered_by) VALUES (?, ?)", + (group_id, triggered_by), + ) + return int(cur.lastrowid) + + +def get_group_run(group_run_id: int) -> dict | None: + with db.connect() as c: + return _row(c.execute( + "SELECT gr.*, g.name AS group_name " + "FROM group_run gr " + "JOIN grp g ON gr.group_id=g.id " + "WHERE gr.id=?", + (group_run_id,)).fetchone()) + + +def finish_group_run(group_run_id: int, *, status: str) -> None: + with db.connect() as c: + c.execute( + "UPDATE group_run SET finished_at=datetime('now'), status=? WHERE id=?", + (status, group_run_id), + ) + + +def list_group_runs(group_id: int, limit: int = 20) -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute( + "SELECT gr.*, " + "CASE WHEN gr.started_at IS NOT NULL AND gr.finished_at IS NOT NULL " + "THEN CAST(ROUND((julianday(gr.finished_at) - julianday(gr.started_at)) * 86400) AS INTEGER) " + "ELSE NULL END AS duration_s " + "FROM group_run gr " + "WHERE gr.group_id=? ORDER BY gr.id DESC LIMIT ?", + (group_id, limit))] + + +def list_runs_for_group_run(group_run_id: int) -> list[dict]: + with db.connect() as c: + return [dict(r) for r in c.execute( + "SELECT r.*, m.name AS module_name, " + "CASE WHEN r.started_at IS NOT NULL AND r.finished_at IS NOT NULL " + "THEN CAST(ROUND((julianday(r.finished_at) - julianday(r.started_at)) * 86400) AS INTEGER) " + "ELSE NULL END AS duration_s " + "FROM run_log r " + "LEFT JOIN module m ON r.module_id=m.id " + "WHERE r.group_run_id=? " + "ORDER BY r.id", + (group_run_id,))] + + def list_runs(*, module_id: int | None = None, status: str | None = None, exclude_status: str | None = None, limit: int = 50) -> list[dict]: where, params = [], [] diff --git a/pipekit/web/app.py b/pipekit/web/app.py index ce7d154..b7b26b8 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -63,6 +63,7 @@ def home(request: Request): modules = repo.list_modules() conns_by_id = {c["id"]: c for c in repo.list_connections()} drivers_by_id = {d["id"]: d for d in repo.list_drivers()} + groups_by_module = repo.module_group_map() # attach last-run summary to each module for m in modules: @@ -76,6 +77,7 @@ def home(request: Request): m["last_run_at"] = None m["last_status"] = None m["last_row_count"] = None + m["groups"] = groups_by_module.get(m["id"], []) # group by source connection grouped: dict[tuple[str, str], list] = {} @@ -405,7 +407,7 @@ def wizard_step3(request: Request, try: for c in drv.get_columns(conn, table, **qvals): d = c.to_dict() - d["default_dest_name"] = c.name.lower() + d["default_dest_name"] = _sanitize_identifier(c.name) d["default_dest_type"] = drv.map_type(c.type_raw) d["default_description"] = c.description or "" columns.append(d) @@ -650,6 +652,17 @@ def _save_inline_watermarks(form, module_id: int) -> None: default_value=default_val) +def _sanitize_identifier(name: str) -> str: + """Lower-case a source column name and replace characters that aren't + valid in an unquoted identifier with underscores.""" + import re as _re + s = name.lower().strip() + s = _re.sub(r"[^a-z0-9_$#.]", "_", s) + if s and s[0].isdigit(): + s = "_" + s + return s or "_" + + def _sql_str(v: str) -> str: """SQL string literal — PG-style single-quote escaping.""" return "'" + v.replace("'", "''") + "'" @@ -978,3 +991,190 @@ def hook_delete(hook_id: int): module_id = hook["module_id"] repo.delete_hook(hook_id) return RedirectResponse(url=f"/modules/{module_id}", status_code=303) + + +# --------------------------------------------------------------------------- +# Groups +# --------------------------------------------------------------------------- + +@_router.get("/groups", response_class=HTMLResponse) +def groups_index(request: Request): + groups = repo.list_groups() + for g in groups: + members = repo.list_group_members(g["id"]) + g["member_count"] = len(members) + recent = repo.list_group_runs(g["id"], limit=1) + if recent: + g["last_run_at"] = recent[0]["started_at"] + g["last_status"] = recent[0]["status"] + else: + g["last_run_at"] = None + g["last_status"] = None + return _templates.TemplateResponse( + request, "groups.html", + _ctx(groups=groups), + ) + + +@_router.get("/groups/new", response_class=HTMLResponse) +def group_new(request: Request): + return _templates.TemplateResponse( + request, "group_form.html", + _ctx(group=None, all_modules=repo.list_modules(), members=[], + form_action="/groups", cancel_url="/groups"), + ) + + +@_router.post("/groups") +async def group_create(request: Request): + form = await request.form() + name = form["name"].strip() + if repo.get_group_by_name(name) is not None: + raise HTTPException(409, f"group name {name!r} already exists") + grp = repo.create_group(name=name) + _save_group_members(form, grp["id"]) + return RedirectResponse(url=f"/groups/{grp['id']}", status_code=303) + + +@_router.get("/groups/{group_id}", response_class=HTMLResponse) +def group_detail(request: Request, group_id: int): + grp = repo.get_group(group_id) + if grp is None: + raise HTTPException(404, f"group id={group_id} not found") + members = repo.list_group_members(group_id) + recent_runs = repo.list_group_runs(group_id, limit=10) + group_running = bool(recent_runs and recent_runs[0]["status"] == "running") + return _templates.TemplateResponse( + request, "group_detail.html", + _ctx(group=grp, members=members, recent_runs=recent_runs, + group_running=group_running), + ) + + +@_router.get("/groups/{group_id}/edit", response_class=HTMLResponse) +def group_edit(request: Request, group_id: int): + grp = repo.get_group(group_id) + if grp is None: + raise HTTPException(404, f"group id={group_id} not found") + members = repo.list_group_members(group_id) + return _templates.TemplateResponse( + request, "group_form.html", + _ctx(group=grp, all_modules=repo.list_modules(), members=members, + form_action=f"/groups/{group_id}", cancel_url=f"/groups/{group_id}", + section="groups"), + ) + + +@_router.post("/groups/{group_id}") +async def group_update(request: Request, group_id: int): + grp = repo.get_group(group_id) + if grp is None: + raise HTTPException(404, f"group id={group_id} not found") + form = await request.form() + name = form["name"].strip() + if name != grp["name"] and repo.get_group_by_name(name) is not None: + raise HTTPException(409, f"group name {name!r} already exists") + repo.update_group(group_id, name=name) + _save_group_members(form, group_id) + return RedirectResponse(url=f"/groups/{group_id}", status_code=303) + + +@_router.post("/groups/{group_id}/delete") +def group_delete(group_id: int): + if repo.get_group(group_id) is None: + raise HTTPException(404, f"group id={group_id} not found") + repo.delete_group(group_id) + return RedirectResponse(url="/groups", status_code=303) + + +@_router.post("/groups/{group_id}/run") +async def group_run_action(group_id: int, request: Request, + background: BackgroundTasks): + grp = repo.get_group(group_id) + if grp is None: + raise HTTPException(404, f"group id={group_id} not found") + form = await request.form() + dry = form.get("dry_run") == "1" + group_run_id = repo.create_group_run(group_id, triggered_by="manual") + background.add_task(_run_group_in_background, group_id, group_run_id, dry) + if request.headers.get("HX-Request"): + members = repo.list_group_members(group_id) + recent_runs = repo.list_group_runs(group_id, limit=10) + return _templates.TemplateResponse( + request, "_group_live.html", + _ctx(group=grp, members=members, recent_runs=recent_runs, + group_running=True, force_poll=True)) + return RedirectResponse(url=f"/group-runs/{group_run_id}", status_code=303) + + +@_router.get("/groups/{group_id}/live-fragment", response_class=HTMLResponse) +def group_live_fragment(request: Request, group_id: int): + grp = repo.get_group(group_id) + if grp is None: + raise HTTPException(404, f"group id={group_id} not found") + members = repo.list_group_members(group_id) + recent_runs = repo.list_group_runs(group_id, limit=10) + group_running = bool(recent_runs and recent_runs[0]["status"] == "running") + return _templates.TemplateResponse( + request, "_group_live.html", + _ctx(group=grp, members=members, recent_runs=recent_runs, + group_running=group_running, force_poll=False)) + + +def _run_group_in_background(group_id: int, group_run_id: int, + dry_run: bool) -> None: + try: + engine.run_group(group_id, dry_run=dry_run, group_run_id=group_run_id) + except Exception as e: # noqa: BLE001 + repo.finish_group_run(group_run_id, status="error") + + +# --------------------------------------------------------------------------- +# Group run detail +# --------------------------------------------------------------------------- + +@_router.get("/group-runs/{group_run_id}", response_class=HTMLResponse) +def group_run_detail(request: Request, group_run_id: int): + group_run = repo.get_group_run(group_run_id) + if group_run is None: + raise HTTPException(404, f"group run id={group_run_id} not found") + module_runs = repo.list_runs_for_group_run(group_run_id) + return _templates.TemplateResponse( + request, "group_run_detail.html", + _ctx(group_run=group_run, module_runs=module_runs), + ) + + +@_router.get("/group-runs/{group_run_id}/live", response_class=HTMLResponse) +def group_run_live_fragment(request: Request, group_run_id: int): + group_run = repo.get_group_run(group_run_id) + if group_run is None: + raise HTTPException(404, f"group run id={group_run_id} not found") + module_runs = repo.list_runs_for_group_run(group_run_id) + return _templates.TemplateResponse( + request, "_group_run_live.html", + _ctx(group_run=group_run, module_runs=module_runs), + ) + + +def _save_group_members(form, group_id: int) -> None: + module_ids = form.getlist("member_module_id") + run_orders = form.getlist("member_run_order") + members = [] + seen = set() + for i, mid_str in enumerate(module_ids): + if not mid_str: + continue + try: + mid = int(mid_str) + except ValueError: + continue + if mid in seen: + continue + seen.add(mid) + try: + order = int(run_orders[i]) if i < len(run_orders) else i + except (ValueError, IndexError): + order = i + members.append({"module_id": mid, "run_order": order}) + repo.set_group_members(group_id, members) diff --git a/pipekit/web/static/style.css b/pipekit/web/static/style.css index 013ef8f..02d2315 100644 --- a/pipekit/web/static/style.css +++ b/pipekit/web/static/style.css @@ -140,6 +140,20 @@ table.grid tr:hover td { background: #1c2128; } .pill.disabled { color: var(--text-muted); } .pill.warning { color: var(--warning); } +/* Group membership tags */ +.tag { + display: inline-block; + padding: 0.05rem 0.45rem; + border-radius: 4px; + font-size: 11px; + font-weight: 500; + background: var(--border); + color: var(--text-muted); + text-decoration: none; + margin-right: 0.2rem; +} +.tag:hover { color: var(--accent); } + /* Labeled key-value rows (used in detail views) */ dl.keyval { display: grid; diff --git a/pipekit/web/templates/_group_live.html b/pipekit/web/templates/_group_live.html new file mode 100644 index 0000000..c95aab5 --- /dev/null +++ b/pipekit/web/templates/_group_live.html @@ -0,0 +1,44 @@ +{# Partial: recent group runs panel for group_detail.html. + Polls every 3s while a group run is in progress. #} + +
+
+
+ Recent runs + last {{ recent_runs|length }} +
+
+ {% if recent_runs %} + + + + + + + + + + + + {% for r in recent_runs %} + + + + + + + + {% endfor %} + +
idstarteddurationtriggered bystatus
#{{ r.id }}{{ r.started_at or "—" }}{{ r.duration_s | duration }}{{ r.triggered_by or "—" }}{{ r.status }}
+ {% else %} +
No runs yet.
+ {% endif %} +
+
+
diff --git a/pipekit/web/templates/_group_run_live.html b/pipekit/web/templates/_group_run_live.html new file mode 100644 index 0000000..42768b9 --- /dev/null +++ b/pipekit/web/templates/_group_run_live.html @@ -0,0 +1,49 @@ +{# Partial: module runs table for group_run_detail.html. Polls while running. #} +
+
+
+ Module runs + + {{ group_run.status }} + {% if group_run.duration_s is not none %} · {{ group_run.duration_s | duration }}{% endif %} + +
+
+ {% if module_runs %} + + + + + + + + + + + + + {% for r in module_runs %} + + + + + + + + + {% endfor %} + +
runmodulestarteddurationstatusrows
#{{ r.id }}{{ r.module_name }}{{ r.started_at or "—" }}{{ r.duration_s | duration }}{{ r.status }}{{ r.row_count if r.row_count is not none else "—" }}
+ {% elif group_run.status == 'running' %} +
Waiting for module runs to start…
+ {% else %} +
No module runs recorded.
+ {% endif %} +
+
+
diff --git a/pipekit/web/templates/base.html b/pipekit/web/templates/base.html index 95f3866..9a3c6ea 100644 --- a/pipekit/web/templates/base.html +++ b/pipekit/web/templates/base.html @@ -13,6 +13,7 @@ v{{ version }} · API docs diff --git a/pipekit/web/templates/group_detail.html b/pipekit/web/templates/group_detail.html new file mode 100644 index 0000000..367d2b9 --- /dev/null +++ b/pipekit/web/templates/group_detail.html @@ -0,0 +1,70 @@ +{% extends "base.html" %} +{% set section = "groups" %} +{% block title %}{{ group.name }} — Pipekit{% endblock %} + +{% block content %} +
+
+ {{ group.name }} + group #{{ group.id }} + + Edit +
+ +
+
+ + +
+
+
+
+ {% if members %} + + + + + + + + + + {% for m in members %} + + + + + + {% endfor %} + +
ordermoduleenabled
{{ m.run_order }}{{ m.module_name }} + {% if m.module_enabled %} + yes + {% else %} + disabled — skipped + {% endif %} +
+ {% else %} +
No members yet. Add some →
+ {% endif %} +
+
+ +{% include "_group_live.html" %} + +
+
Danger zone
+
+
+ +
+
+
+{% endblock %} diff --git a/pipekit/web/templates/group_form.html b/pipekit/web/templates/group_form.html new file mode 100644 index 0000000..6acb45c --- /dev/null +++ b/pipekit/web/templates/group_form.html @@ -0,0 +1,103 @@ +{% extends "base.html" %} +{% set section = "groups" %} +{% block title %}{% if group %}Edit group · {{ group.name }}{% else %}New group{% endif %} — Pipekit{% endblock %} + +{% block content %} +
+
+ {% if group %}Edit group · {{ group.name }}{% else %}New group{% endif %} + ← back +
+
+
+ + +
+
+ Members + run in order, lowest first; disabled modules are skipped + +
+
+ + + + + + + + + + {% for m in members %} + + + + + + {% endfor %} + +
ordermodule
+ + + + + +
+ {% if not members %} +
No members yet.
+ {% endif %} +
+
+ +
+ cancel + +
+
+
+
+ + + + +{% endblock %} diff --git a/pipekit/web/templates/group_run_detail.html b/pipekit/web/templates/group_run_detail.html new file mode 100644 index 0000000..1abe76d --- /dev/null +++ b/pipekit/web/templates/group_run_detail.html @@ -0,0 +1,69 @@ +{% extends "base.html" %} +{% set section = "groups" %} +{% block title %}Group run #{{ group_run.id }} — Pipekit{% endblock %} + +{% block content %} +
+
+ Group run #{{ group_run.id }} + + {{ group_run.group_name }} · + started {{ group_run.started_at }} + + + {{ group_run.status }} + +
+
+
+
started
{{ group_run.started_at }}
+
finished
{{ group_run.finished_at or "—" }}
+
duration
{{ group_run.duration_s | duration }}
+
triggered by
{{ group_run.triggered_by or "—" }}
+
+
+
+ +
+
+
Module runs
+
+ {% if module_runs %} + + + + + + + + + + + + + {% for r in module_runs %} + + + + + + + + + {% endfor %} + +
runmodulestarteddurationstatusrows
#{{ r.id }}{{ r.module_name }}{{ r.started_at or "—" }}{{ r.duration_s | duration }}{{ r.status }}{{ r.row_count if r.row_count is not none else "—" }}
+ {% elif group_run.status == 'running' %} +
Waiting for module runs to start…
+ {% else %} +
No module runs recorded.
+ {% endif %} +
+
+
+{% endblock %} diff --git a/pipekit/web/templates/groups.html b/pipekit/web/templates/groups.html new file mode 100644 index 0000000..995bbdd --- /dev/null +++ b/pipekit/web/templates/groups.html @@ -0,0 +1,60 @@ +{% extends "base.html" %} +{% set section = "groups" %} +{% block title %}Groups — Pipekit{% endblock %} + +{% block content %} +
+
+ Groups + {{ groups|length }} total + + New group… + +
+
+ {% if groups %} + + + + + + + + + + + + {% for g in groups %} + + + + + + + + {% endfor %} + +
namememberslast runstatus
{{ g.name }}{{ g.member_count }}{{ g.last_run_at or "—" }} + {% if g.last_status %} + {{ g.last_status }} + {% else %} + never ran + {% endif %} + +
+ +
+
+ + +
+
+ {% else %} +
+ No groups yet.
+ Create one +
+ {% endif %} +
+
+{% endblock %} diff --git a/pipekit/web/templates/modules_index.html b/pipekit/web/templates/modules_index.html index 39b4a8b..9d03972 100644 --- a/pipekit/web/templates/modules_index.html +++ b/pipekit/web/templates/modules_index.html @@ -21,6 +21,7 @@ name strategy dest + groups last run status rows @@ -33,6 +34,11 @@ {{ m.name }} {{ m.merge_strategy }} {{ m.dest_table }} + + {% for g in m.groups %} + {{ g.group_name }} + {% endfor %} + {{ m.last_run_at or "—" }} {% with module=m %}{% include "_module_status_pill.html" %}{% endwith %} {{ m.last_row_count if m.last_row_count is not none else "—" }}