diff --git a/pipekit/drivers/base.py b/pipekit/drivers/base.py index 3249419..0441b23 100644 --- a/pipekit/drivers/base.py +++ b/pipekit/drivers/base.py @@ -167,6 +167,30 @@ class Driver(abc.ABC): """DDL to create new_table with the same columns as source_table.""" return f"CREATE TABLE {new_table} (LIKE {source_table} INCLUDING ALL);" + def build_add_column_sql(self, qualified_table: str, column: dict) -> str: + """DDL to append one column to an existing table. ALTER can only add + at the end — which keeps the positional load aligned as long as the + column is also last in the source projection.""" + name = column["dest_name"] + dtype = (column.get("dest_type") or "text").strip() + if not dtype: + raise ValueError(f"column {name!r} has no dest_type") + return (f"ALTER TABLE {qualified_table} " + f"ADD COLUMN {self.quote_identifier(name)} {dtype};") + + def column_inventory(self, conn: dict, schema: str, + table: str) -> "list[dict]": + """Ordered [{name, type}] of an existing dest table, or [] if absent. + Default uses INFORMATION_SCHEMA (PG/MSSQL); override for other catalogs.""" + sql = ( + "SELECT column_name, data_type FROM information_schema.columns " + f"WHERE table_schema='{schema}' AND table_name='{table}' " + "ORDER BY ordinal_position" + ) + r = self.query(conn, sql) + return [{"name": row[0].strip(), "type": row[1].strip()} + for row in r.rows if row and row[0]] + def check_dest_table(self, conn: dict, schema: str, table: str) -> "set[str] | None": """Return lowercase column names of an existing dest table, or None diff --git a/pipekit/repo.py b/pipekit/repo.py index da54df7..5b32d66 100644 --- a/pipekit/repo.py +++ b/pipekit/repo.py @@ -232,6 +232,14 @@ def update_module(module_id: int, *, name: str | None = None, return get_module(module_id) +def update_module_columns(module_id: int, columns: list[dict]) -> dict | None: + """Replace a module's stored column listing (columns_json).""" + with db.connect() as c: + c.execute("UPDATE module SET columns_json=?, updated_at=datetime('now') " + "WHERE id=?", (json.dumps(columns), module_id)) + return get_module(module_id) + + class ModuleRunning(RuntimeError): """Raised by delete_module when the module is currently running.""" diff --git a/pipekit/web/app.py b/pipekit/web/app.py index 51d874a..3174ab0 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -900,6 +900,32 @@ def _build_comment_sql(dest_drv, qualified_dest: str, return "\n".join(stmts) +def _ensure_column_ids(cols: list[dict]) -> list[dict]: + """Assign a stable `id` (c1, c2, …) to any column row missing one. + Ids are the data-movement identity for future schema reconciliation.""" + import re as _re + n = 0 + for c in cols: + m = _re.fullmatch(r"c(\d+)", str(c.get("id") or "")) + if m: + n = max(n, int(m.group(1))) + for c in cols: + if not c.get("id"): + n += 1 + c["id"] = f"c{n}" + return cols + + +def _next_column_id(cols: list[dict]) -> str: + import re as _re + n = 0 + for c in cols: + m = _re.fullmatch(r"c(\d+)", str(c.get("id") or "")) + if m: + n = max(n, int(m.group(1))) + return f"c{n + 1}" + + # --------------------------------------------------------------------------- # Connections # --------------------------------------------------------------------------- @@ -1074,6 +1100,85 @@ async def watermark_create(request: Request, module_id: int): return RedirectResponse(url=f"/modules/{module_id}", status_code=303) +@_router.get("/modules/{module_id}/columns/new", response_class=HTMLResponse) +def column_new(request: Request, module_id: int): + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + return _templates.TemplateResponse( + request, "column_form.html", + _ctx(module=module, form_action=f"/modules/{module_id}/columns", + cancel_url=f"/modules/{module_id}", error=None, values={}), + ) + + +@_router.post("/modules/{module_id}/columns") +async def column_create(request: Request, module_id: int): + import json as _json + module = repo.get_module(module_id) + if module is None: + raise HTTPException(404, f"module id={module_id} not found") + form = await request.form() + values = { + "dest_name": (form.get("dest_name") or "").strip(), + "dest_type": (form.get("dest_type") or "").strip() or "text", + "dest_description": (form.get("dest_description") or "").strip(), + "source_name": (form.get("source_name") or "").strip(), + "source_type": (form.get("source_type") or "").strip(), + } + + def _err(msg: str): + return _templates.TemplateResponse( + request, "column_form.html", + _ctx(module=module, form_action=f"/modules/{module_id}/columns", + cancel_url=f"/modules/{module_id}", error=msg, values=values), + status_code=400, + ) + + if not values["dest_name"]: + return _err("dest name is required") + + cols = _ensure_column_ids(_json.loads(module["columns_json"]) if module["columns_json"] else []) + if any(c["dest_name"].lower() == values["dest_name"].lower() for c in cols): + return _err(f"column {values['dest_name']!r} is already in the listing") + + dest_conn = repo.get_connection(module["dest_connection_id"]) + dest_drv = _driver_for_conn(dest_conn) if dest_conn else None + if dest_drv is None: + return _err("destination connection has no driver") + + dest_schema, _, dest_table_bare = module["dest_table"].partition(".") + if not dest_table_bare: + dest_schema, dest_table_bare = "public", dest_schema + try: + inv = dest_drv.column_inventory(dest_conn, dest_schema, dest_table_bare) + except jrunner.JrunnerError as e: + return _err(f"could not read {module['dest_table']}: {e}") + if any(ci["name"].lower() == values["dest_name"].lower() for ci in inv): + return _err(f"column {values['dest_name']!r} already exists on {module['dest_table']}") + + new_col = { + "id": _next_column_id(cols), + "source_name": values["source_name"], + "source_type": values["source_type"], + "dest_name": values["dest_name"], + "dest_type": values["dest_type"], + "description": values["dest_description"] or None, + } + qualified = dest_drv.qualified_table_name(dest_table_bare, schema=dest_schema) + try: + jrunner.run_dest_sql(dest_conn, dest_drv.build_add_column_sql(qualified, new_col)) + if new_col["description"] and dest_drv.supports_table_comments: + jrunner.run_dest_sql( + dest_conn, _build_comment_sql(dest_drv, qualified, None, [new_col])) + except jrunner.JrunnerError as e: + return _err(f"ALTER TABLE failed: {e}") + + cols.append(new_col) + repo.update_module_columns(module_id, cols) + return RedirectResponse(url=f"/modules/{module_id}", status_code=303) + + @_router.get("/watermarks/{watermark_id}/edit", response_class=HTMLResponse) def watermark_edit(request: Request, watermark_id: int): wm = repo.get_watermark(watermark_id) diff --git a/pipekit/web/templates/column_form.html b/pipekit/web/templates/column_form.html new file mode 100644 index 0000000..e71395c --- /dev/null +++ b/pipekit/web/templates/column_form.html @@ -0,0 +1,68 @@ +{% extends "base.html" %} +{% set section = "modules" %} +{% block title %}Add column · {{ module.name }} — Pipekit{% endblock %} + +{% block content %} +
+
+ Add column · {{ module.name }} + appends to {{ module.dest_table }} + ← back to module +
+
+ {% if error %} +
{{ error }}
+ {% endif %} + +
+ This runs ALTER TABLE … ADD COLUMN, which appends to the + end of the table (existing rows preserved, new column NULL). + The load is positional — add the matching expression as the + last column of the source query, or the next run will misalign. +
+ +
+
+ + +
+ + + +
+ + +
+ +
+ cancel + +
+
+
+
+{% endblock %} diff --git a/pipekit/web/templates/module_detail.html b/pipekit/web/templates/module_detail.html index 9d762aa..7ae8500 100644 --- a/pipekit/web/templates/module_detail.html +++ b/pipekit/web/templates/module_detail.html @@ -56,10 +56,12 @@
{{ module.source_query }}
- {% if schema_cols or module.dest_description %}
Schema {{ schema_cols|length }} column{{ 's' if schema_cols|length != 1 else '' }} + + + add column +
{% if module.dest_description %} @@ -86,10 +88,11 @@ {% endfor %} + {% else %} +
No columns recorded — add one to define the destination schema.
{% endif %}
- {% endif %} {% if preview %}