web: add columns to an existing module's target table (Phase 0+1)

Modules' column listings were write-once at wizard time — no way to add a
column to an established sync (e.g. an RRN watermark column on a history
table) without hand-editing columns_json and ALTERing the dest by hand.

Phase 0 (groundwork):
- columns_json rows get a stable `id` (c1, c2, …) — the data-movement
  identity for future schema reconciliation (the load is positional).
- repo.update_module_columns to persist the listing.
- Driver.build_add_column_sql + Driver.column_inventory.

Phase 1 (append a column):
- "+ add column" on the module detail page -> column_form.html.
- POST /modules/{id}/columns: validates the name isn't already in the
  listing or on the table, runs ALTER TABLE … ADD COLUMN (appends at the
  tail, rows preserved), applies COMMENT ON COLUMN where supported, and
  appends to columns_json. Re-renders with an error on conflict/DDL failure.

Append-only and non-destructive; reorder/retype/drop (which can require a
table rebuild) are out of scope for this phase. Verified end-to-end against
the live PG dest on a throwaway module.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-06-12 10:09:12 -04:00
parent da0396bff9
commit 7202b86210
5 changed files with 210 additions and 2 deletions

View File

@ -167,6 +167,30 @@ class Driver(abc.ABC):
"""DDL to create new_table with the same columns as source_table."""
return f"CREATE TABLE {new_table} (LIKE {source_table} INCLUDING ALL);"
def build_add_column_sql(self, qualified_table: str, column: dict) -> str:
"""DDL to append one column to an existing table. ALTER can only add
at the end which keeps the positional load aligned as long as the
column is also last in the source projection."""
name = column["dest_name"]
dtype = (column.get("dest_type") or "text").strip()
if not dtype:
raise ValueError(f"column {name!r} has no dest_type")
return (f"ALTER TABLE {qualified_table} "
f"ADD COLUMN {self.quote_identifier(name)} {dtype};")
def column_inventory(self, conn: dict, schema: str,
table: str) -> "list[dict]":
"""Ordered [{name, type}] of an existing dest table, or [] if absent.
Default uses INFORMATION_SCHEMA (PG/MSSQL); override for other catalogs."""
sql = (
"SELECT column_name, data_type FROM information_schema.columns "
f"WHERE table_schema='{schema}' AND table_name='{table}' "
"ORDER BY ordinal_position"
)
r = self.query(conn, sql)
return [{"name": row[0].strip(), "type": row[1].strip()}
for row in r.rows if row and row[0]]
def check_dest_table(self, conn: dict, schema: str,
table: str) -> "set[str] | None":
"""Return lowercase column names of an existing dest table, or None

View File

@ -232,6 +232,14 @@ def update_module(module_id: int, *, name: str | None = None,
return get_module(module_id)
def update_module_columns(module_id: int, columns: list[dict]) -> dict | None:
"""Replace a module's stored column listing (columns_json)."""
with db.connect() as c:
c.execute("UPDATE module SET columns_json=?, updated_at=datetime('now') "
"WHERE id=?", (json.dumps(columns), module_id))
return get_module(module_id)
class ModuleRunning(RuntimeError):
"""Raised by delete_module when the module is currently running."""

View File

@ -900,6 +900,32 @@ def _build_comment_sql(dest_drv, qualified_dest: str,
return "\n".join(stmts)
def _ensure_column_ids(cols: list[dict]) -> list[dict]:
"""Assign a stable `id` (c1, c2, …) to any column row missing one.
Ids are the data-movement identity for future schema reconciliation."""
import re as _re
n = 0
for c in cols:
m = _re.fullmatch(r"c(\d+)", str(c.get("id") or ""))
if m:
n = max(n, int(m.group(1)))
for c in cols:
if not c.get("id"):
n += 1
c["id"] = f"c{n}"
return cols
def _next_column_id(cols: list[dict]) -> str:
import re as _re
n = 0
for c in cols:
m = _re.fullmatch(r"c(\d+)", str(c.get("id") or ""))
if m:
n = max(n, int(m.group(1)))
return f"c{n + 1}"
# ---------------------------------------------------------------------------
# Connections
# ---------------------------------------------------------------------------
@ -1074,6 +1100,85 @@ async def watermark_create(request: Request, module_id: int):
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@_router.get("/modules/{module_id}/columns/new", response_class=HTMLResponse)
def column_new(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
return _templates.TemplateResponse(
request, "column_form.html",
_ctx(module=module, form_action=f"/modules/{module_id}/columns",
cancel_url=f"/modules/{module_id}", error=None, values={}),
)
@_router.post("/modules/{module_id}/columns")
async def column_create(request: Request, module_id: int):
import json as _json
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
form = await request.form()
values = {
"dest_name": (form.get("dest_name") or "").strip(),
"dest_type": (form.get("dest_type") or "").strip() or "text",
"dest_description": (form.get("dest_description") or "").strip(),
"source_name": (form.get("source_name") or "").strip(),
"source_type": (form.get("source_type") or "").strip(),
}
def _err(msg: str):
return _templates.TemplateResponse(
request, "column_form.html",
_ctx(module=module, form_action=f"/modules/{module_id}/columns",
cancel_url=f"/modules/{module_id}", error=msg, values=values),
status_code=400,
)
if not values["dest_name"]:
return _err("dest name is required")
cols = _ensure_column_ids(_json.loads(module["columns_json"]) if module["columns_json"] else [])
if any(c["dest_name"].lower() == values["dest_name"].lower() for c in cols):
return _err(f"column {values['dest_name']!r} is already in the listing")
dest_conn = repo.get_connection(module["dest_connection_id"])
dest_drv = _driver_for_conn(dest_conn) if dest_conn else None
if dest_drv is None:
return _err("destination connection has no driver")
dest_schema, _, dest_table_bare = module["dest_table"].partition(".")
if not dest_table_bare:
dest_schema, dest_table_bare = "public", dest_schema
try:
inv = dest_drv.column_inventory(dest_conn, dest_schema, dest_table_bare)
except jrunner.JrunnerError as e:
return _err(f"could not read {module['dest_table']}: {e}")
if any(ci["name"].lower() == values["dest_name"].lower() for ci in inv):
return _err(f"column {values['dest_name']!r} already exists on {module['dest_table']}")
new_col = {
"id": _next_column_id(cols),
"source_name": values["source_name"],
"source_type": values["source_type"],
"dest_name": values["dest_name"],
"dest_type": values["dest_type"],
"description": values["dest_description"] or None,
}
qualified = dest_drv.qualified_table_name(dest_table_bare, schema=dest_schema)
try:
jrunner.run_dest_sql(dest_conn, dest_drv.build_add_column_sql(qualified, new_col))
if new_col["description"] and dest_drv.supports_table_comments:
jrunner.run_dest_sql(
dest_conn, _build_comment_sql(dest_drv, qualified, None, [new_col]))
except jrunner.JrunnerError as e:
return _err(f"ALTER TABLE failed: {e}")
cols.append(new_col)
repo.update_module_columns(module_id, cols)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@_router.get("/watermarks/{watermark_id}/edit", response_class=HTMLResponse)
def watermark_edit(request: Request, watermark_id: int):
wm = repo.get_watermark(watermark_id)

View File

@ -0,0 +1,68 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}Add column &middot; {{ module.name }} — Pipekit{% endblock %}
{% block content %}
<div class="panel">
<header>
Add column &middot; {{ module.name }}
<span class="subtitle">appends to {{ module.dest_table }}</span>
<span style="margin-left:auto"><a href="{{ cancel_url }}">&larr; back to module</a></span>
</header>
<div class="body">
{% if error %}
<div class="flash err" style="margin-bottom:0.8rem">{{ error }}</div>
{% endif %}
<div class="flash warn" style="margin-bottom:0.8rem">
This runs <code>ALTER TABLE … ADD COLUMN</code>, which appends to the
<strong>end</strong> of the table (existing rows preserved, new column NULL).
The load is positional — add the matching expression as the
<strong>last</strong> column of the source query, or the next run will misalign.
</div>
<form method="post" action="{{ form_action }}">
<div class="two-col" style="gap:1rem">
<label class="field">
<span>dest column name</span>
<input type="text" name="dest_name" required class="mono"
value="{{ values.dest_name or '' }}">
<span class="help">name in the destination table</span>
</label>
<label class="field">
<span>dest type</span>
<input type="text" name="dest_type" class="mono"
value="{{ values.dest_type or 'text' }}">
<span class="help">e.g. <code>bigint</code>, <code>text</code>, <code>numeric(15,2)</code></span>
</label>
</div>
<label class="field">
<span>description</span>
<textarea name="dest_description" rows="2">{{ values.dest_description or '' }}</textarea>
<span class="help">applied as <code>COMMENT ON COLUMN</code> (if the dest supports it)</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>source name <span style="color:var(--text-muted)">(optional)</span></span>
<input type="text" name="source_name" class="mono"
value="{{ values.source_name or '' }}">
<span class="help">informational — e.g. <code>RRN(T)</code></span>
</label>
<label class="field">
<span>source type <span style="color:var(--text-muted)">(optional)</span></span>
<input type="text" name="source_type" class="mono"
value="{{ values.source_type or '' }}">
<span class="help">informational only</span>
</label>
</div>
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
<button type="submit" class="primary">add column</button>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@ -56,10 +56,12 @@
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
</div>
{% if schema_cols or module.dest_description %}
<div class="panel">
<header>Schema
<span class="subtitle">{{ schema_cols|length }} column{{ 's' if schema_cols|length != 1 else '' }}</span>
<span style="margin-left:auto">
<a class="btn" href="/modules/{{ module.id }}/columns/new">+ add column</a>
</span>
</header>
<div class="body tight">
{% if module.dest_description %}
@ -86,10 +88,11 @@
{% endfor %}
</tbody>
</table>
{% else %}
<div class="empty">No columns recorded — add one to define the destination schema.</div>
{% endif %}
</div>
</div>
{% endif %}
{% if preview %}
<div class="panel">