web: add SQL-entry path to the new-module wizard

The wizard previously required picking a single source table; modules whose
entry point is arbitrary SQL (CTEs, joins, computed columns) didn't fit. Add a
"write SQL" path alongside "browse a table":

- Driver.introspect_query_columns + _zero_row_wrap discover a query's result
  columns by running it with ~no rows. Generic wrap is a derived table with
  WHERE 1=0; DB2 appends FETCH FIRST 1 ROW ONLY (DB2 for i forbids WITH inside
  a nested table expression).
- /wizard/sql + POST /wizard/sql/columns seed the column-mapping grid; dest
  types default to text (no result-set type metadata over jrunner CSV).
- wizard_step3.html grows a sql_mode branch (array-named inputs, query shown
  verbatim, no column unchecking); wizard_create branches on entry_mode.

Verified end-to-end against a live DB2 for i connection, including a top-level
CTE query.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-06-11 09:05:19 -04:00
parent e7576926ae
commit da0396bff9
9 changed files with 310 additions and 52 deletions

View File

@ -100,6 +100,10 @@ Passwords in connections are stored as `$VAR_NAME` references. At run time they
Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver.
## Wizard Entry Modes
Step 1 offers two paths. **Browse a table** (`/wizard/tables` → `/wizard/columns`) is the original flow: pick one source table, introspect its columns. **Write SQL** (`/wizard/sql` → `POST /wizard/sql/columns`) starts from an arbitrary query — the source query is used verbatim and its result columns are discovered by `Driver.introspect_query_columns`, which runs the query wrapped to fetch ~no rows (`_zero_row_wrap`: derived-table + `WHERE 1=0` by default; DB2 appends `FETCH FIRST 1 ROW ONLY` since DB2 for i forbids `WITH` inside a nested table expression). Both paths land on `wizard_step3.html` (the SQL path passes `sql_mode=True`) and POST to `/wizard/create`, which branches on `entry_mode`. SQL-mode dest types default to `text` — there's no result-set type metadata over jrunner's CSV output, so the user adjusts types by hand. `columns_json` is never read at run time (the engine does `CREATE staging LIKE dest` + `SELECT *`); it only drives the dest `CREATE TABLE`.
## Module Columns
Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.

View File

@ -181,6 +181,24 @@ class Driver(abc.ABC):
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
# ---- Arbitrary-query introspection (SQL-entry wizard path) ----
def introspect_query_columns(self, conn: dict, sql: str) -> list[str]:
"""Return the result-set column names of an arbitrary user query,
fetching (near) zero rows. Used by the wizard's SQL-entry path to
seed the column-mapping grid when the source isn't a single table."""
wrapped = self._zero_row_wrap(sql)
r = self.query(conn, wrapped)
return [(c or "").strip() for c in r.columns]
def _zero_row_wrap(self, sql: str) -> str:
"""Wrap an arbitrary query so it yields its columns but ~no rows.
Generic default wraps it as a derived table with a false predicate;
this works on dialects where a sub-SELECT may carry its own WITH
(e.g. PostgreSQL). Dialects that forbid WITH inside a nested table
expression (DB2 for i, MSSQL) override this."""
body = sql.strip().rstrip(";").rstrip()
return f"SELECT * FROM (\n{body}\n) AS _pk_introspect WHERE 1=0"
# ---- Shared helper ----
def query(self, conn: dict, sql: str) -> jrunner.QueryResult:
"""Run `sql` in jrunner query mode against `conn`."""

View File

@ -115,6 +115,13 @@ class DB2Driver(Driver):
v = result.rows[0][0].strip()
return v or None
def _zero_row_wrap(self, sql: str) -> str:
# DB2 for i forbids a WITH clause inside a nested table expression,
# so we can't wrap the query as a derived table. Append a fetch-limit
# to the outer statement instead. (0 is rejected — must be positive.)
body = sql.strip().rstrip(";").rstrip()
return f"{body}\nFETCH FIRST 1 ROW ONLY"
def qualified_table_name(self, table: str, *, schema: str) -> str:
return f"{self.quote_identifier(schema)}.{self.quote_identifier(table)}"

View File

@ -541,15 +541,106 @@ def wizard_step3(request: Request,
)
@_router.get("/wizard/sql", response_class=HTMLResponse)
def wizard_sql_entry(request: Request,
source_connection_id: int = Query(...)):
"""Alternate step 2 — write an arbitrary source query instead of
browsing a single table."""
conn = repo.get_connection(source_connection_id)
if conn is None:
raise HTTPException(404, f"connection id={source_connection_id} not found")
drv = _driver_for_conn(conn)
if drv is None:
raise HTTPException(500, "driver row missing for connection")
return _templates.TemplateResponse(
request,
"wizard_sql.html",
_ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind,
source_query=""),
)
@_router.post("/wizard/sql/columns", response_class=HTMLResponse)
async def wizard_sql_columns(request: Request):
"""Introspect an arbitrary query's result columns and render the
column-mapping grid (SQL-entry variant of step 3)."""
form = await request.form()
source_connection_id = int(form["source_connection_id"])
source_query = (form.get("source_query") or "").strip()
conn = repo.get_connection(source_connection_id)
if conn is None:
raise HTTPException(404, f"connection id={source_connection_id} not found")
drv = _driver_for_conn(conn)
if drv is None:
raise HTTPException(500, "driver row missing for connection")
if not source_query:
return _templates.TemplateResponse(
request, "wizard_sql.html",
_ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind,
source_query=source_query,
fetch_error="Enter a source query first."),
)
fetch_error: str | None = None
columns: list[dict] = []
try:
names = drv.introspect_query_columns(conn, source_query)
for i, name in enumerate(names):
columns.append({
"position": i + 1,
"name": name,
"default_dest_name": _sanitize_identifier(name),
"default_dest_type": "text",
"default_description": "",
})
except (jrunner.JrunnerError, ValueError) as e:
fetch_error = str(e)
except Exception as e: # noqa: BLE001
fetch_error = f"{type(e).__name__}: {e}"
if fetch_error or not columns:
return _templates.TemplateResponse(
request, "wizard_sql.html",
_ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind,
source_query=source_query,
fetch_error=fetch_error or "Query returned no columns."),
)
drivers_by_id = {d["id"]: d for d in repo.list_drivers()}
dest_conns = [
c for c in repo.list_connections()
if drivers_by_id.get(c["driver_id"]) is not None
]
default_dest_conn_id = conn.get("default_dest_connection_id")
default_dest_schema = conn.get("default_dest_schema") or ""
return _templates.TemplateResponse(
request,
"wizard_step3.html",
_ctx(step=3, mode="sql", sql_mode=True, connection=conn,
all_connections=dest_conns, driver_kind=drv.kind,
columns=columns, source_query=source_query,
qualified_table="(custom query)", table="", qvals={},
table_description="", fetch_error=None,
default_module_name="",
default_dest_conn_id=default_dest_conn_id,
default_dest_schema=default_dest_schema,
dest_warn=None),
)
@_router.post("/wizard/create")
async def wizard_create(request: Request):
"""Step 4 — build source_query from picks, create the module,
and provision the destination schema + table."""
form = await request.form()
entry_mode = form.get("entry_mode", "table")
source_connection_id = int(form["source_connection_id"])
dest_connection_id = int(form["dest_connection_id"])
table = form["table"]
table = form.get("table", "")
module_name = form["module_name"].strip()
dest_table = form["dest_table"].strip()
merge_strategy = form.get("merge_strategy", "full")
@ -576,42 +667,70 @@ async def wizard_create(request: Request):
if dest_drv is None:
raise HTTPException(500, "driver row missing for dest connection")
qvals: dict = {}
for f in src_drv.browse_fields():
v = form.get(f.name)
if v:
qvals[f.name] = v
if entry_mode == "sql":
# SQL-entry path: the source query is used verbatim and the column
# mapping comes from parallel arrays seeded by query introspection.
source_query = (form.get("source_query") or "").strip()
if not source_query:
raise HTTPException(400, "source query is required")
col_names = form.getlist("sql_col_name")
dest_names = form.getlist("sql_dest_name")
dest_types = form.getlist("sql_dest_type")
dest_descs = form.getlist("sql_dest_desc")
chosen = []
for i, src_name in enumerate(col_names):
dest_name = (dest_names[i] if i < len(dest_names) else "").strip()
dest_type = (dest_types[i] if i < len(dest_types) else "").strip()
desc = (dest_descs[i] if i < len(dest_descs) else "").strip() or None
if not dest_name or not dest_type:
raise HTTPException(
400, f"column {src_name!r} missing dest_name or dest_type")
chosen.append({
"source_name": src_name,
"source_type": "",
"dest_name": dest_name,
"dest_type": dest_type,
"description": desc,
})
if not chosen:
raise HTTPException(400, "no columns to create")
else:
qvals: dict = {}
for f in src_drv.browse_fields():
v = form.get(f.name)
if v:
qvals[f.name] = v
all_cols = src_drv.get_columns(src_conn, table, **qvals)
by_name = {c.name: c for c in all_cols}
chosen = []
for name in picked:
if name not in by_name:
continue
src_col = by_name[name]
dest_name = (form.get(f"dest_name__{name}") or "").strip()
dest_type = (form.get(f"dest_type__{name}") or "").strip()
desc = (form.get(f"dest_desc__{name}") or "").strip() or None
if not dest_name or not dest_type:
raise HTTPException(400, f"column {name!r} missing dest_name or dest_type")
chosen.append({
"source_name": src_col.name,
"source_type": src_col.type_raw,
"dest_name": dest_name,
"dest_type": dest_type,
"description": desc,
})
if not chosen:
raise HTTPException(400, "no columns selected")
all_cols = src_drv.get_columns(src_conn, table, **qvals)
by_name = {c.name: c for c in all_cols}
chosen = []
for name in picked:
if name not in by_name:
continue
src_col = by_name[name]
dest_name = (form.get(f"dest_name__{name}") or "").strip()
dest_type = (form.get(f"dest_type__{name}") or "").strip()
desc = (form.get(f"dest_desc__{name}") or "").strip() or None
if not dest_name or not dest_type:
raise HTTPException(400, f"column {name!r} missing dest_name or dest_type")
chosen.append({
"source_name": src_col.name,
"source_type": src_col.type_raw,
"dest_name": dest_name,
"dest_type": dest_type,
"description": desc,
})
if not chosen:
raise HTTPException(400, "no columns selected")
qualified_source = src_drv.qualified_table_name(table, **qvals)
select_list = ",\n ".join(
f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS "
f"{dest_drv.quote_identifier(c['dest_name'])}"
for c in chosen
)
source_query_override = (form.get("source_query") or "").strip()
source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}"
qualified_source = src_drv.qualified_table_name(table, **qvals)
select_list = ",\n ".join(
f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS "
f"{dest_drv.quote_identifier(c['dest_name'])}"
for c in chosen
)
source_query_override = (form.get("source_query") or "").strip()
source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}"
dest_schema, _, dest_table_bare = dest_table.partition(".")
if not dest_table_bare:
@ -636,11 +755,15 @@ async def wizard_create(request: Request):
missing = [c["dest_name"] for c in chosen
if c["dest_name"].lower() not in existing_cols]
if missing:
back_qs = urlencode(
[("source_connection_id", source_connection_id),
("table", table),
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
*qvals.items()])
if entry_mode == "sql":
back_url = f"/wizard/sql?source_connection_id={source_connection_id}"
else:
back_qs = urlencode(
[("source_connection_id", source_connection_id),
("table", table),
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
*qvals.items()])
back_url = f"/wizard/columns?{back_qs}"
return _templates.TemplateResponse(
request,
"wizard_error.html",
@ -649,7 +772,7 @@ async def wizard_create(request: Request):
qualified_dest=qualified_dest,
missing=missing,
existing=sorted(existing_cols),
back_qs=back_qs,
back_url=back_url,
),
status_code=409,
)

View File

@ -4,7 +4,7 @@
<span class="num">1</span> source connection
</div>
<div class="step {% if step == 2 %}active{% elif step > 2 %}done{% endif %}">
<span class="num">2</span> browse tables
<span class="num">2</span> {% if mode == "sql" %}write SQL{% else %}browse tables{% endif %}
</div>
<div class="step {% if step == 3 %}active{% elif step > 3 %}done{% endif %}">
<span class="num">3</span> columns &amp; config

View File

@ -8,7 +8,7 @@
Wizard error
<span class="subtitle">{{ title }}</span>
<span style="margin-left:auto">
<a href="/wizard/columns?{{ back_qs }}">&larr; back to step 3</a>
<a href="{{ back_url }}">&larr; back to step 3</a>
</span>
</header>
<div class="body">

View File

@ -0,0 +1,43 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}New module — write SQL{% endblock %}
{% block content %}
{% include "_wizard_steps.html" %}
<div class="panel">
<header>
Step 2 — write the source query
<span class="subtitle">{{ connection.name }} ({{ driver_kind }})</span>
<span style="margin-left:auto"><a href="/wizard">&larr; different connection</a></span>
</header>
<div class="body">
{% if fetch_error %}
<div class="flash err" style="margin-bottom:0.8rem">
Could not introspect the query's columns:
<pre class="sql" style="margin-top:0.4rem">{{ fetch_error }}</pre>
</div>
{% endif %}
<form method="post" action="/wizard/sql/columns">
<input type="hidden" name="source_connection_id" value="{{ connection.id }}">
<label class="field">
<span>source query</span>
<textarea name="source_query" id="source-query" rows="18" required
class="mono" style="width:100%"
placeholder="SELECT ... FROM ...">{{ source_query }}</textarea>
<span class="help">
Runs verbatim against the source at run time. Use
<code>{name}</code> placeholders for watermarks. On
<em>introspect</em>, pipekit runs the query fetching ~no rows to
discover its result columns.
</span>
</label>
<div style="display:flex;justify-content:flex-end;gap:0.5rem;margin-top:0.8rem">
<a class="btn ghost" href="/wizard">cancel</a>
<button type="submit" class="primary">introspect columns &rarr;</button>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@ -36,10 +36,15 @@
{% endfor %}
</tbody>
</table>
<div class="body" style="display:flex;justify-content:flex-end;gap:0.5rem">
<div class="body" style="display:flex;justify-content:flex-end;gap:0.5rem;align-items:center">
<a class="btn ghost" href="/">cancel</a>
<button type="submit" class="primary">next &rarr;</button>
<button type="submit" class="btn" formaction="/wizard/sql">write SQL &rarr;</button>
<button type="submit" class="primary" formaction="/wizard/tables">browse a table &rarr;</button>
</div>
<p class="help" style="text-align:right;margin:0 0 0.6rem">
<strong>browse a table</strong> picks one table and its columns ·
<strong>write SQL</strong> starts from an arbitrary query
</p>
</form>
{% else %}
<div class="empty">

View File

@ -7,9 +7,15 @@
<div class="panel">
<header>
Step 3 — choose columns &amp; configure merge
Step 3 — {% if sql_mode %}map columns &amp; configure merge{% else %}choose columns &amp; configure merge{% endif %}
<span class="subtitle">{{ qualified_table }}</span>
<span style="margin-left:auto"><a href="/wizard/tables?source_connection_id={{ connection.id }}{% for k,v in qvals.items() %}&amp;{{ k }}={{ v }}{% endfor %}&amp;browse=1">&larr; different table</a></span>
<span style="margin-left:auto">
{% if sql_mode %}
<a href="/wizard/sql?source_connection_id={{ connection.id }}">&larr; edit SQL</a>
{% else %}
<a href="/wizard/tables?source_connection_id={{ connection.id }}{% for k,v in qvals.items() %}&amp;{{ k }}={{ v }}{% endfor %}&amp;browse=1">&larr; different table</a>
{% endif %}
</span>
</header>
<div class="body">
{% if fetch_error %}
@ -31,36 +37,75 @@
{% if not fetch_error %}
<form method="post" action="/wizard/create">
<input type="hidden" name="source_connection_id" value="{{ connection.id }}">
{% if sql_mode %}
<input type="hidden" name="entry_mode" value="sql">
{% else %}
<input type="hidden" name="table" value="{{ table }}">
{% for k, v in qvals.items() %}
<input type="hidden" name="{{ k }}" value="{{ v }}">
{% endfor %}
{% endif %}
<div class="two-col">
<div class="panel">
<header>
Columns
{% if sql_mode %}
<span class="subtitle">{{ columns|length }} from the query — all created; types default to <code>text</code></span>
{% else %}
<span class="subtitle">{{ columns|length }} total — uncheck to exclude</span>
<span style="margin-left:auto">
<button type="button" class="ghost" onclick="toggleAll(true)">all</button>
<button type="button" class="ghost" onclick="toggleAll(false)">none</button>
</span>
{% endif %}
</header>
<div class="body tight">
<table class="grid picker">
<thead>
<tr>
<th class="pick"></th>
{% if not sql_mode %}<th class="pick"></th>{% endif %}
<th style="width:3em">#</th>
<th>source name</th>
{% if not sql_mode %}
<th>source type</th>
<th style="width:3em">null?</th>
{% endif %}
<th>dest name</th>
<th>dest type</th>
<th>description</th>
</tr>
</thead>
<tbody>
{% if sql_mode %}
{% for c in columns %}
<tr>
<td class="mono">{{ c.position }}</td>
<td class="mono">
{{ c.name }}
<input type="hidden" name="sql_col_name" value="{{ c.name }}">
</td>
<td>
<input type="text" class="mono"
name="sql_dest_name"
value="{{ c.default_dest_name }}"
style="width:100%;font-size:12px">
</td>
<td>
<input type="text" class="mono"
name="sql_dest_type"
value="{{ c.default_dest_type }}"
style="width:100%;font-size:12px">
</td>
<td>
<input type="text"
name="sql_dest_desc"
value="{{ c.default_description }}"
style="width:100%;font-size:12px">
</td>
</tr>
{% endfor %}
{% else %}
{% for c in columns %}
<tr onclick="var cb=document.getElementById('col-{{ loop.index }}'); if(event.target.tagName!=='INPUT') cb.checked=!cb.checked; fillQuery()">
<td class="pick">
@ -92,6 +137,7 @@
</td>
</tr>
{% endfor %}
{% endif %}
</tbody>
</table>
</div>
@ -192,16 +238,24 @@
</div>
</div>
<div class="panel" id="query-panel" style="display:none">
<div class="panel" id="query-panel" style="{% if not sql_mode %}display:none{% endif %}">
<header>Source query
{% if sql_mode %}
<span class="subtitle">runs verbatim at run time</span>
{% else %}
<span class="subtitle">auto-generated from picks — edit to add WHERE clause</span>
{% endif %}
</header>
<div class="body">
<textarea name="source_query" id="source-query" rows="10" class="mono"
style="width:100%" oninput="checkPlaceholders()"
placeholder="Leave blank to auto-generate from column picks"></textarea>
{% if not sql_mode %}placeholder="Leave blank to auto-generate from column picks"{% endif %}>{% if sql_mode %}{{ source_query }}{% endif %}</textarea>
<div id="placeholder-warning" class="flash warn" style="display:none;margin-top:0.4rem"></div>
{% if sql_mode %}
<span class="help">Edit if needed. <code>{name}</code> placeholders resolve from watermarks at run time.</span>
{% else %}
<span class="help">Leave blank to auto-generate. Add <code>WHERE col &gt; {name}</code> for incremental filtering.</span>
{% endif %}
</div>
</div>
@ -241,6 +295,7 @@
<script>
const QUALIFIED_TABLE = {{ qualified_table | tojson }};
const SQL_MODE = {{ 'true' if sql_mode else 'false' }};
function toggleAll(val) {
document.querySelectorAll('.col-check').forEach(function (cb) { cb.checked = val; });
@ -250,12 +305,15 @@
function onStrategyChange(val) {
document.getElementById('mkf').style.display = val === 'incremental' ? '' : 'none';
document.getElementById('wm-panel').style.display = val === 'incremental' ? '' : 'none';
document.getElementById('query-panel').style.display = val === 'incremental' ? '' : 'none';
if (val === 'incremental') fillQuery();
// In SQL mode the query panel is always shown (it holds the user's query).
document.getElementById('query-panel').style.display =
(SQL_MODE || val === 'incremental') ? '' : 'none';
if (!SQL_MODE && val === 'incremental') fillQuery();
checkPlaceholders();
}
function fillQuery() {
if (SQL_MODE) return; // never overwrite a hand-written query
const ta = document.getElementById('source-query');
if (!ta) return;
const checked = [...document.querySelectorAll('.col-check:checked')];