From da0396bff936bd659515b3259c3f78c2f43491b0 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Thu, 11 Jun 2026 09:05:19 -0400 Subject: [PATCH] web: add SQL-entry path to the new-module wizard The wizard previously required picking a single source table; modules whose entry point is arbitrary SQL (CTEs, joins, computed columns) didn't fit. Add a "write SQL" path alongside "browse a table": - Driver.introspect_query_columns + _zero_row_wrap discover a query's result columns by running it with ~no rows. Generic wrap is a derived table with WHERE 1=0; DB2 appends FETCH FIRST 1 ROW ONLY (DB2 for i forbids WITH inside a nested table expression). - /wizard/sql + POST /wizard/sql/columns seed the column-mapping grid; dest types default to text (no result-set type metadata over jrunner CSV). - wizard_step3.html grows a sql_mode branch (array-named inputs, query shown verbatim, no column unchecking); wizard_create branches on entry_mode. Verified end-to-end against a live DB2 for i connection, including a top-level CTE query. Co-Authored-By: Claude Opus 4.8 --- CLAUDE.md | 4 + pipekit/drivers/base.py | 18 ++ pipekit/drivers/db2.py | 7 + pipekit/web/app.py | 205 ++++++++++++++++++----- pipekit/web/templates/_wizard_steps.html | 2 +- pipekit/web/templates/wizard_error.html | 2 +- pipekit/web/templates/wizard_sql.html | 43 +++++ pipekit/web/templates/wizard_step1.html | 9 +- pipekit/web/templates/wizard_step3.html | 72 +++++++- 9 files changed, 310 insertions(+), 52 deletions(-) create mode 100644 pipekit/web/templates/wizard_sql.html diff --git a/CLAUDE.md b/CLAUDE.md index 4010516..9361e52 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -100,6 +100,10 @@ Passwords in connections are stored as `$VAR_NAME` references. At run time they Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver. +## Wizard Entry Modes + +Step 1 offers two paths. **Browse a table** (`/wizard/tables` → `/wizard/columns`) is the original flow: pick one source table, introspect its columns. **Write SQL** (`/wizard/sql` → `POST /wizard/sql/columns`) starts from an arbitrary query — the source query is used verbatim and its result columns are discovered by `Driver.introspect_query_columns`, which runs the query wrapped to fetch ~no rows (`_zero_row_wrap`: derived-table + `WHERE 1=0` by default; DB2 appends `FETCH FIRST 1 ROW ONLY` since DB2 for i forbids `WITH` inside a nested table expression). Both paths land on `wizard_step3.html` (the SQL path passes `sql_mode=True`) and POST to `/wizard/create`, which branches on `entry_mode`. SQL-mode dest types default to `text` — there's no result-set type metadata over jrunner's CSV output, so the user adjusts types by hand. `columns_json` is never read at run time (the engine does `CREATE staging LIKE dest` + `SELECT *`); it only drives the dest `CREATE TABLE`. + ## Module Columns Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists. diff --git a/pipekit/drivers/base.py b/pipekit/drivers/base.py index 8e1ac27..3249419 100644 --- a/pipekit/drivers/base.py +++ b/pipekit/drivers/base.py @@ -181,6 +181,24 @@ class Driver(abc.ABC): return None return {row[0].strip().lower() for row in r.rows if row and row[0]} + # ---- Arbitrary-query introspection (SQL-entry wizard path) ---- + def introspect_query_columns(self, conn: dict, sql: str) -> list[str]: + """Return the result-set column names of an arbitrary user query, + fetching (near) zero rows. Used by the wizard's SQL-entry path to + seed the column-mapping grid when the source isn't a single table.""" + wrapped = self._zero_row_wrap(sql) + r = self.query(conn, wrapped) + return [(c or "").strip() for c in r.columns] + + def _zero_row_wrap(self, sql: str) -> str: + """Wrap an arbitrary query so it yields its columns but ~no rows. + Generic default wraps it as a derived table with a false predicate; + this works on dialects where a sub-SELECT may carry its own WITH + (e.g. PostgreSQL). Dialects that forbid WITH inside a nested table + expression (DB2 for i, MSSQL) override this.""" + body = sql.strip().rstrip(";").rstrip() + return f"SELECT * FROM (\n{body}\n) AS _pk_introspect WHERE 1=0" + # ---- Shared helper ---- def query(self, conn: dict, sql: str) -> jrunner.QueryResult: """Run `sql` in jrunner query mode against `conn`.""" diff --git a/pipekit/drivers/db2.py b/pipekit/drivers/db2.py index 866203f..412ee04 100644 --- a/pipekit/drivers/db2.py +++ b/pipekit/drivers/db2.py @@ -115,6 +115,13 @@ class DB2Driver(Driver): v = result.rows[0][0].strip() return v or None + def _zero_row_wrap(self, sql: str) -> str: + # DB2 for i forbids a WITH clause inside a nested table expression, + # so we can't wrap the query as a derived table. Append a fetch-limit + # to the outer statement instead. (0 is rejected — must be positive.) + body = sql.strip().rstrip(";").rstrip() + return f"{body}\nFETCH FIRST 1 ROW ONLY" + def qualified_table_name(self, table: str, *, schema: str) -> str: return f"{self.quote_identifier(schema)}.{self.quote_identifier(table)}" diff --git a/pipekit/web/app.py b/pipekit/web/app.py index fd95558..51d874a 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -541,15 +541,106 @@ def wizard_step3(request: Request, ) +@_router.get("/wizard/sql", response_class=HTMLResponse) +def wizard_sql_entry(request: Request, + source_connection_id: int = Query(...)): + """Alternate step 2 — write an arbitrary source query instead of + browsing a single table.""" + conn = repo.get_connection(source_connection_id) + if conn is None: + raise HTTPException(404, f"connection id={source_connection_id} not found") + drv = _driver_for_conn(conn) + if drv is None: + raise HTTPException(500, "driver row missing for connection") + return _templates.TemplateResponse( + request, + "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=""), + ) + + +@_router.post("/wizard/sql/columns", response_class=HTMLResponse) +async def wizard_sql_columns(request: Request): + """Introspect an arbitrary query's result columns and render the + column-mapping grid (SQL-entry variant of step 3).""" + form = await request.form() + source_connection_id = int(form["source_connection_id"]) + source_query = (form.get("source_query") or "").strip() + + conn = repo.get_connection(source_connection_id) + if conn is None: + raise HTTPException(404, f"connection id={source_connection_id} not found") + drv = _driver_for_conn(conn) + if drv is None: + raise HTTPException(500, "driver row missing for connection") + + if not source_query: + return _templates.TemplateResponse( + request, "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=source_query, + fetch_error="Enter a source query first."), + ) + + fetch_error: str | None = None + columns: list[dict] = [] + try: + names = drv.introspect_query_columns(conn, source_query) + for i, name in enumerate(names): + columns.append({ + "position": i + 1, + "name": name, + "default_dest_name": _sanitize_identifier(name), + "default_dest_type": "text", + "default_description": "", + }) + except (jrunner.JrunnerError, ValueError) as e: + fetch_error = str(e) + except Exception as e: # noqa: BLE001 + fetch_error = f"{type(e).__name__}: {e}" + + if fetch_error or not columns: + return _templates.TemplateResponse( + request, "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=source_query, + fetch_error=fetch_error or "Query returned no columns."), + ) + + drivers_by_id = {d["id"]: d for d in repo.list_drivers()} + dest_conns = [ + c for c in repo.list_connections() + if drivers_by_id.get(c["driver_id"]) is not None + ] + default_dest_conn_id = conn.get("default_dest_connection_id") + default_dest_schema = conn.get("default_dest_schema") or "" + + return _templates.TemplateResponse( + request, + "wizard_step3.html", + _ctx(step=3, mode="sql", sql_mode=True, connection=conn, + all_connections=dest_conns, driver_kind=drv.kind, + columns=columns, source_query=source_query, + qualified_table="(custom query)", table="", qvals={}, + table_description="", fetch_error=None, + default_module_name="", + default_dest_conn_id=default_dest_conn_id, + default_dest_schema=default_dest_schema, + dest_warn=None), + ) + + @_router.post("/wizard/create") async def wizard_create(request: Request): """Step 4 — build source_query from picks, create the module, and provision the destination schema + table.""" form = await request.form() + entry_mode = form.get("entry_mode", "table") source_connection_id = int(form["source_connection_id"]) dest_connection_id = int(form["dest_connection_id"]) - table = form["table"] + table = form.get("table", "") module_name = form["module_name"].strip() dest_table = form["dest_table"].strip() merge_strategy = form.get("merge_strategy", "full") @@ -576,42 +667,70 @@ async def wizard_create(request: Request): if dest_drv is None: raise HTTPException(500, "driver row missing for dest connection") - qvals: dict = {} - for f in src_drv.browse_fields(): - v = form.get(f.name) - if v: - qvals[f.name] = v + if entry_mode == "sql": + # SQL-entry path: the source query is used verbatim and the column + # mapping comes from parallel arrays seeded by query introspection. + source_query = (form.get("source_query") or "").strip() + if not source_query: + raise HTTPException(400, "source query is required") + col_names = form.getlist("sql_col_name") + dest_names = form.getlist("sql_dest_name") + dest_types = form.getlist("sql_dest_type") + dest_descs = form.getlist("sql_dest_desc") + chosen = [] + for i, src_name in enumerate(col_names): + dest_name = (dest_names[i] if i < len(dest_names) else "").strip() + dest_type = (dest_types[i] if i < len(dest_types) else "").strip() + desc = (dest_descs[i] if i < len(dest_descs) else "").strip() or None + if not dest_name or not dest_type: + raise HTTPException( + 400, f"column {src_name!r} missing dest_name or dest_type") + chosen.append({ + "source_name": src_name, + "source_type": "", + "dest_name": dest_name, + "dest_type": dest_type, + "description": desc, + }) + if not chosen: + raise HTTPException(400, "no columns to create") + else: + qvals: dict = {} + for f in src_drv.browse_fields(): + v = form.get(f.name) + if v: + qvals[f.name] = v - all_cols = src_drv.get_columns(src_conn, table, **qvals) - by_name = {c.name: c for c in all_cols} - chosen = [] - for name in picked: - if name not in by_name: - continue - src_col = by_name[name] - dest_name = (form.get(f"dest_name__{name}") or "").strip() - dest_type = (form.get(f"dest_type__{name}") or "").strip() - desc = (form.get(f"dest_desc__{name}") or "").strip() or None - if not dest_name or not dest_type: - raise HTTPException(400, f"column {name!r} missing dest_name or dest_type") - chosen.append({ - "source_name": src_col.name, - "source_type": src_col.type_raw, - "dest_name": dest_name, - "dest_type": dest_type, - "description": desc, - }) - if not chosen: - raise HTTPException(400, "no columns selected") + all_cols = src_drv.get_columns(src_conn, table, **qvals) + by_name = {c.name: c for c in all_cols} + chosen = [] + for name in picked: + if name not in by_name: + continue + src_col = by_name[name] + dest_name = (form.get(f"dest_name__{name}") or "").strip() + dest_type = (form.get(f"dest_type__{name}") or "").strip() + desc = (form.get(f"dest_desc__{name}") or "").strip() or None + if not dest_name or not dest_type: + raise HTTPException(400, f"column {name!r} missing dest_name or dest_type") + chosen.append({ + "source_name": src_col.name, + "source_type": src_col.type_raw, + "dest_name": dest_name, + "dest_type": dest_type, + "description": desc, + }) + if not chosen: + raise HTTPException(400, "no columns selected") - qualified_source = src_drv.qualified_table_name(table, **qvals) - select_list = ",\n ".join( - f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS " - f"{dest_drv.quote_identifier(c['dest_name'])}" - for c in chosen - ) - source_query_override = (form.get("source_query") or "").strip() - source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}" + qualified_source = src_drv.qualified_table_name(table, **qvals) + select_list = ",\n ".join( + f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS " + f"{dest_drv.quote_identifier(c['dest_name'])}" + for c in chosen + ) + source_query_override = (form.get("source_query") or "").strip() + source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}" dest_schema, _, dest_table_bare = dest_table.partition(".") if not dest_table_bare: @@ -636,11 +755,15 @@ async def wizard_create(request: Request): missing = [c["dest_name"] for c in chosen if c["dest_name"].lower() not in existing_cols] if missing: - back_qs = urlencode( - [("source_connection_id", source_connection_id), - ("table", table), - ("table_schema", qvals.get("schema") or qvals.get("library") or ""), - *qvals.items()]) + if entry_mode == "sql": + back_url = f"/wizard/sql?source_connection_id={source_connection_id}" + else: + back_qs = urlencode( + [("source_connection_id", source_connection_id), + ("table", table), + ("table_schema", qvals.get("schema") or qvals.get("library") or ""), + *qvals.items()]) + back_url = f"/wizard/columns?{back_qs}" return _templates.TemplateResponse( request, "wizard_error.html", @@ -649,7 +772,7 @@ async def wizard_create(request: Request): qualified_dest=qualified_dest, missing=missing, existing=sorted(existing_cols), - back_qs=back_qs, + back_url=back_url, ), status_code=409, ) diff --git a/pipekit/web/templates/_wizard_steps.html b/pipekit/web/templates/_wizard_steps.html index db5fef9..8577f46 100644 --- a/pipekit/web/templates/_wizard_steps.html +++ b/pipekit/web/templates/_wizard_steps.html @@ -4,7 +4,7 @@ 1 source connection
- 2 browse tables + 2 {% if mode == "sql" %}write SQL{% else %}browse tables{% endif %}
3 columns & config diff --git a/pipekit/web/templates/wizard_error.html b/pipekit/web/templates/wizard_error.html index 5ddfbca..048d7b0 100644 --- a/pipekit/web/templates/wizard_error.html +++ b/pipekit/web/templates/wizard_error.html @@ -8,7 +8,7 @@ Wizard error {{ title }} - ← back to step 3 + ← back to step 3
diff --git a/pipekit/web/templates/wizard_sql.html b/pipekit/web/templates/wizard_sql.html new file mode 100644 index 0000000..e4f64a0 --- /dev/null +++ b/pipekit/web/templates/wizard_sql.html @@ -0,0 +1,43 @@ +{% extends "base.html" %} +{% set section = "modules" %} +{% block title %}New module — write SQL{% endblock %} + +{% block content %} +{% include "_wizard_steps.html" %} + +
+
+ Step 2 — write the source query + {{ connection.name }} ({{ driver_kind }}) + ← different connection +
+
+ {% if fetch_error %} +
+ Could not introspect the query's columns: +
{{ fetch_error }}
+
+ {% endif %} + +
+ + +
+ cancel + +
+
+
+
+{% endblock %} diff --git a/pipekit/web/templates/wizard_step1.html b/pipekit/web/templates/wizard_step1.html index 5018f83..0460194 100644 --- a/pipekit/web/templates/wizard_step1.html +++ b/pipekit/web/templates/wizard_step1.html @@ -36,10 +36,15 @@ {% endfor %} -
+
cancel - + +
+

+ browse a table picks one table and its columns · + write SQL starts from an arbitrary query +

{% else %}
diff --git a/pipekit/web/templates/wizard_step3.html b/pipekit/web/templates/wizard_step3.html index d74b603..ea2e6cd 100644 --- a/pipekit/web/templates/wizard_step3.html +++ b/pipekit/web/templates/wizard_step3.html @@ -7,9 +7,15 @@
- Step 3 — choose columns & configure merge + Step 3 — {% if sql_mode %}map columns & configure merge{% else %}choose columns & configure merge{% endif %} {{ qualified_table }} - ← different table + + {% if sql_mode %} + ← edit SQL + {% else %} + ← different table + {% endif %} +
{% if fetch_error %} @@ -31,36 +37,75 @@ {% if not fetch_error %}
+ {% if sql_mode %} + + {% else %} {% for k, v in qvals.items() %} {% endfor %} + {% endif %}
Columns + {% if sql_mode %} + {{ columns|length }} from the query — all created; types default to text + {% else %} {{ columns|length }} total — uncheck to exclude + {% endif %}
- + {% if not sql_mode %}{% endif %} + {% if not sql_mode %} + {% endif %} + {% if sql_mode %} + {% for c in columns %} + + + + + + + + {% endfor %} + {% else %} {% for c in columns %} {% endfor %} + {% endif %}
# source namesource type null?dest name dest type description
{{ c.position }} + {{ c.name }} + + + + + + + +
@@ -92,6 +137,7 @@
@@ -192,16 +238,24 @@
-