diff --git a/CLAUDE.md b/CLAUDE.md index 4010516..9361e52 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -100,6 +100,10 @@ Passwords in connections are stored as `$VAR_NAME` references. At run time they Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements: `browse_fields`, `list_tables`, `list_schemas`, `get_columns`, `map_type`, `default_expression`, `quote_identifier`. The wizard UI calls `/api/introspect/*` which dispatches to the appropriate driver. +## Wizard Entry Modes + +Step 1 offers two paths. **Browse a table** (`/wizard/tables` → `/wizard/columns`) is the original flow: pick one source table, introspect its columns. **Write SQL** (`/wizard/sql` → `POST /wizard/sql/columns`) starts from an arbitrary query — the source query is used verbatim and its result columns are discovered by `Driver.introspect_query_columns`, which runs the query wrapped to fetch ~no rows (`_zero_row_wrap`: derived-table + `WHERE 1=0` by default; DB2 appends `FETCH FIRST 1 ROW ONLY` since DB2 for i forbids `WITH` inside a nested table expression). Both paths land on `wizard_step3.html` (the SQL path passes `sql_mode=True`) and POST to `/wizard/create`, which branches on `entry_mode`. SQL-mode dest types default to `text` — there's no result-set type metadata over jrunner's CSV output, so the user adjusts types by hand. `columns_json` is never read at run time (the engine does `CREATE staging LIKE dest` + `SELECT *`); it only drives the dest `CREATE TABLE`. + ## Module Columns Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists. diff --git a/pipekit/drivers/base.py b/pipekit/drivers/base.py index 8e1ac27..3249419 100644 --- a/pipekit/drivers/base.py +++ b/pipekit/drivers/base.py @@ -181,6 +181,24 @@ class Driver(abc.ABC): return None return {row[0].strip().lower() for row in r.rows if row and row[0]} + # ---- Arbitrary-query introspection (SQL-entry wizard path) ---- + def introspect_query_columns(self, conn: dict, sql: str) -> list[str]: + """Return the result-set column names of an arbitrary user query, + fetching (near) zero rows. Used by the wizard's SQL-entry path to + seed the column-mapping grid when the source isn't a single table.""" + wrapped = self._zero_row_wrap(sql) + r = self.query(conn, wrapped) + return [(c or "").strip() for c in r.columns] + + def _zero_row_wrap(self, sql: str) -> str: + """Wrap an arbitrary query so it yields its columns but ~no rows. + Generic default wraps it as a derived table with a false predicate; + this works on dialects where a sub-SELECT may carry its own WITH + (e.g. PostgreSQL). Dialects that forbid WITH inside a nested table + expression (DB2 for i, MSSQL) override this.""" + body = sql.strip().rstrip(";").rstrip() + return f"SELECT * FROM (\n{body}\n) AS _pk_introspect WHERE 1=0" + # ---- Shared helper ---- def query(self, conn: dict, sql: str) -> jrunner.QueryResult: """Run `sql` in jrunner query mode against `conn`.""" diff --git a/pipekit/drivers/db2.py b/pipekit/drivers/db2.py index 866203f..412ee04 100644 --- a/pipekit/drivers/db2.py +++ b/pipekit/drivers/db2.py @@ -115,6 +115,13 @@ class DB2Driver(Driver): v = result.rows[0][0].strip() return v or None + def _zero_row_wrap(self, sql: str) -> str: + # DB2 for i forbids a WITH clause inside a nested table expression, + # so we can't wrap the query as a derived table. Append a fetch-limit + # to the outer statement instead. (0 is rejected — must be positive.) + body = sql.strip().rstrip(";").rstrip() + return f"{body}\nFETCH FIRST 1 ROW ONLY" + def qualified_table_name(self, table: str, *, schema: str) -> str: return f"{self.quote_identifier(schema)}.{self.quote_identifier(table)}" diff --git a/pipekit/web/app.py b/pipekit/web/app.py index fd95558..51d874a 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -541,15 +541,106 @@ def wizard_step3(request: Request, ) +@_router.get("/wizard/sql", response_class=HTMLResponse) +def wizard_sql_entry(request: Request, + source_connection_id: int = Query(...)): + """Alternate step 2 — write an arbitrary source query instead of + browsing a single table.""" + conn = repo.get_connection(source_connection_id) + if conn is None: + raise HTTPException(404, f"connection id={source_connection_id} not found") + drv = _driver_for_conn(conn) + if drv is None: + raise HTTPException(500, "driver row missing for connection") + return _templates.TemplateResponse( + request, + "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=""), + ) + + +@_router.post("/wizard/sql/columns", response_class=HTMLResponse) +async def wizard_sql_columns(request: Request): + """Introspect an arbitrary query's result columns and render the + column-mapping grid (SQL-entry variant of step 3).""" + form = await request.form() + source_connection_id = int(form["source_connection_id"]) + source_query = (form.get("source_query") or "").strip() + + conn = repo.get_connection(source_connection_id) + if conn is None: + raise HTTPException(404, f"connection id={source_connection_id} not found") + drv = _driver_for_conn(conn) + if drv is None: + raise HTTPException(500, "driver row missing for connection") + + if not source_query: + return _templates.TemplateResponse( + request, "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=source_query, + fetch_error="Enter a source query first."), + ) + + fetch_error: str | None = None + columns: list[dict] = [] + try: + names = drv.introspect_query_columns(conn, source_query) + for i, name in enumerate(names): + columns.append({ + "position": i + 1, + "name": name, + "default_dest_name": _sanitize_identifier(name), + "default_dest_type": "text", + "default_description": "", + }) + except (jrunner.JrunnerError, ValueError) as e: + fetch_error = str(e) + except Exception as e: # noqa: BLE001 + fetch_error = f"{type(e).__name__}: {e}" + + if fetch_error or not columns: + return _templates.TemplateResponse( + request, "wizard_sql.html", + _ctx(step=2, mode="sql", connection=conn, driver_kind=drv.kind, + source_query=source_query, + fetch_error=fetch_error or "Query returned no columns."), + ) + + drivers_by_id = {d["id"]: d for d in repo.list_drivers()} + dest_conns = [ + c for c in repo.list_connections() + if drivers_by_id.get(c["driver_id"]) is not None + ] + default_dest_conn_id = conn.get("default_dest_connection_id") + default_dest_schema = conn.get("default_dest_schema") or "" + + return _templates.TemplateResponse( + request, + "wizard_step3.html", + _ctx(step=3, mode="sql", sql_mode=True, connection=conn, + all_connections=dest_conns, driver_kind=drv.kind, + columns=columns, source_query=source_query, + qualified_table="(custom query)", table="", qvals={}, + table_description="", fetch_error=None, + default_module_name="", + default_dest_conn_id=default_dest_conn_id, + default_dest_schema=default_dest_schema, + dest_warn=None), + ) + + @_router.post("/wizard/create") async def wizard_create(request: Request): """Step 4 — build source_query from picks, create the module, and provision the destination schema + table.""" form = await request.form() + entry_mode = form.get("entry_mode", "table") source_connection_id = int(form["source_connection_id"]) dest_connection_id = int(form["dest_connection_id"]) - table = form["table"] + table = form.get("table", "") module_name = form["module_name"].strip() dest_table = form["dest_table"].strip() merge_strategy = form.get("merge_strategy", "full") @@ -576,42 +667,70 @@ async def wizard_create(request: Request): if dest_drv is None: raise HTTPException(500, "driver row missing for dest connection") - qvals: dict = {} - for f in src_drv.browse_fields(): - v = form.get(f.name) - if v: - qvals[f.name] = v + if entry_mode == "sql": + # SQL-entry path: the source query is used verbatim and the column + # mapping comes from parallel arrays seeded by query introspection. + source_query = (form.get("source_query") or "").strip() + if not source_query: + raise HTTPException(400, "source query is required") + col_names = form.getlist("sql_col_name") + dest_names = form.getlist("sql_dest_name") + dest_types = form.getlist("sql_dest_type") + dest_descs = form.getlist("sql_dest_desc") + chosen = [] + for i, src_name in enumerate(col_names): + dest_name = (dest_names[i] if i < len(dest_names) else "").strip() + dest_type = (dest_types[i] if i < len(dest_types) else "").strip() + desc = (dest_descs[i] if i < len(dest_descs) else "").strip() or None + if not dest_name or not dest_type: + raise HTTPException( + 400, f"column {src_name!r} missing dest_name or dest_type") + chosen.append({ + "source_name": src_name, + "source_type": "", + "dest_name": dest_name, + "dest_type": dest_type, + "description": desc, + }) + if not chosen: + raise HTTPException(400, "no columns to create") + else: + qvals: dict = {} + for f in src_drv.browse_fields(): + v = form.get(f.name) + if v: + qvals[f.name] = v - all_cols = src_drv.get_columns(src_conn, table, **qvals) - by_name = {c.name: c for c in all_cols} - chosen = [] - for name in picked: - if name not in by_name: - continue - src_col = by_name[name] - dest_name = (form.get(f"dest_name__{name}") or "").strip() - dest_type = (form.get(f"dest_type__{name}") or "").strip() - desc = (form.get(f"dest_desc__{name}") or "").strip() or None - if not dest_name or not dest_type: - raise HTTPException(400, f"column {name!r} missing dest_name or dest_type") - chosen.append({ - "source_name": src_col.name, - "source_type": src_col.type_raw, - "dest_name": dest_name, - "dest_type": dest_type, - "description": desc, - }) - if not chosen: - raise HTTPException(400, "no columns selected") + all_cols = src_drv.get_columns(src_conn, table, **qvals) + by_name = {c.name: c for c in all_cols} + chosen = [] + for name in picked: + if name not in by_name: + continue + src_col = by_name[name] + dest_name = (form.get(f"dest_name__{name}") or "").strip() + dest_type = (form.get(f"dest_type__{name}") or "").strip() + desc = (form.get(f"dest_desc__{name}") or "").strip() or None + if not dest_name or not dest_type: + raise HTTPException(400, f"column {name!r} missing dest_name or dest_type") + chosen.append({ + "source_name": src_col.name, + "source_type": src_col.type_raw, + "dest_name": dest_name, + "dest_type": dest_type, + "description": desc, + }) + if not chosen: + raise HTTPException(400, "no columns selected") - qualified_source = src_drv.qualified_table_name(table, **qvals) - select_list = ",\n ".join( - f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS " - f"{dest_drv.quote_identifier(c['dest_name'])}" - for c in chosen - ) - source_query_override = (form.get("source_query") or "").strip() - source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}" + qualified_source = src_drv.qualified_table_name(table, **qvals) + select_list = ",\n ".join( + f"{src_drv.default_expression(c['source_type'], c['source_name'])} AS " + f"{dest_drv.quote_identifier(c['dest_name'])}" + for c in chosen + ) + source_query_override = (form.get("source_query") or "").strip() + source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}" dest_schema, _, dest_table_bare = dest_table.partition(".") if not dest_table_bare: @@ -636,11 +755,15 @@ async def wizard_create(request: Request): missing = [c["dest_name"] for c in chosen if c["dest_name"].lower() not in existing_cols] if missing: - back_qs = urlencode( - [("source_connection_id", source_connection_id), - ("table", table), - ("table_schema", qvals.get("schema") or qvals.get("library") or ""), - *qvals.items()]) + if entry_mode == "sql": + back_url = f"/wizard/sql?source_connection_id={source_connection_id}" + else: + back_qs = urlencode( + [("source_connection_id", source_connection_id), + ("table", table), + ("table_schema", qvals.get("schema") or qvals.get("library") or ""), + *qvals.items()]) + back_url = f"/wizard/columns?{back_qs}" return _templates.TemplateResponse( request, "wizard_error.html", @@ -649,7 +772,7 @@ async def wizard_create(request: Request): qualified_dest=qualified_dest, missing=missing, existing=sorted(existing_cols), - back_qs=back_qs, + back_url=back_url, ), status_code=409, ) diff --git a/pipekit/web/templates/_wizard_steps.html b/pipekit/web/templates/_wizard_steps.html index db5fef9..8577f46 100644 --- a/pipekit/web/templates/_wizard_steps.html +++ b/pipekit/web/templates/_wizard_steps.html @@ -4,7 +4,7 @@ 1 source connection
{{ fetch_error }}
+ + browse a table picks one table and its columns · + write SQL starts from an arbitrary query +
{% else %}