From 595024eb52fdcdd99708fc93a554c820d77f9f7c Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Mon, 18 May 2026 21:31:55 -0400 Subject: [PATCH] Unify incremental sync config: inline watermarks + editable source query Watermarks, merge strategy, merge key, and source query are now edited together in one form on both the module edit page and wizard step 3. A client-side placeholder warning fires when {name} tokens in the query don't match the watermark rows on the page. The wizard now shows an editable source query textarea pre-populated from column picks so WHERE clauses can be added before module creation. Watermarks submitted via wm_* arrays are processed by _save_inline_watermarks() in both module_update and wizard_create. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- CLAUDE.md | 6 +- SPEC.md | 12 +- pipekit/web/app.py | 43 +++++- pipekit/web/templates/module_form.html | 176 +++++++++++++++++++++--- pipekit/web/templates/wizard_step3.html | 139 ++++++++++++++++++- 5 files changed, 349 insertions(+), 27 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index c016259..790ae97 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -82,6 +82,10 @@ Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists. +## Inline Watermark Editing + +Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone `/watermarks/{id}/edit` page). The module edit form (`module_form.html`) renders existing watermarks as editable rows and submits them as parallel arrays (`wm_id[]`, `wm_name[]`, `wm_connection_id[]`, `wm_resolver_sql[]`, `wm_default_value[]`, `wm_deleted_id[]`). The `_save_inline_watermarks(form, module_id)` helper in `web/app.py` processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both `module_update` and `wizard_create` call it after saving the module. A client-side placeholder warning checks that `{name}` tokens in the source query match the watermark names on the page. + ## Merge Key `merge_key` is stored as a comma-separated string (e.g., `"col1, col2"`). The engine parses it and generates a multi-column DELETE predicate for incremental strategy. @@ -101,7 +105,7 @@ Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF - Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib) - `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep) -- Frontend: HTMX + Alpine.js (CDN), no build step +- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs - jrunner: separate Java tool, must be on PATH ## Full Spec diff --git a/SPEC.md b/SPEC.md index 8b7a916..74a7d79 100644 --- a/SPEC.md +++ b/SPEC.md @@ -366,8 +366,12 @@ Most of the time you accept defaults. Pick dest connection. Dest table defaults to `{source_conn.default_dest_schema}.{lowercase_source_table_name}`. Pick -merge strategy. Pick merge key from a dropdown of dest column names. Add -zero or more watermarks via a sub-form. +merge strategy. Pick merge key. When strategy is `incremental`, a +**Watermarks** panel and an editable **Source query** textarea appear +inline. The source query is pre-populated from column picks; the user +edits it to add the WHERE clause with `{placeholder}` references before +creating the module. Zero or more watermarks can be added in step 3; +they are created atomically with the module on submit. **Multiple destinations are real** (e.g. PG → SQL Server). The wizard doesn't assume one dest. Each source connection has a @@ -436,8 +440,8 @@ Defaults are **opinions hardcoded in driver modules** for now. Lift to a ### Wizard scope (what it does NOT do) -- **No CTE-based queries.** Wizard generates simple `SELECT cols FROM table WHERE watermark`. For complex queries (like `ffsbglr1`), create with the wizard and edit the source query post-creation via `e`. -- **No multi-watermark wizard.** Single watermark. Add more after. +- **No CTE-based queries.** Wizard generates a simple `SELECT cols FROM table` skeleton; the user adds WHERE clauses (including watermark placeholders) in the editable source query textarea in step 3, or post-creation via the module edit form. +- **Multi-watermark supported in wizard.** Add as many watermarks as needed in step 3; they're created atomically with the module. - **No hooks in the wizard.** Add hooks from the module detail screen. - **No group assignment in the wizard.** Assign separately. diff --git a/pipekit/web/app.py b/pipekit/web/app.py index a60df28..a46e4d2 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -141,6 +141,7 @@ def module_edit(request: Request, module_id: int): request, "module_form.html", _ctx(module=module, connections=repo.list_connections(), + watermarks=repo.list_watermarks(module_id), form_action=f"/modules/{module_id}", cancel_url=f"/modules/{module_id}"), ) @@ -195,6 +196,7 @@ async def module_update(request: Request, module_id: int): dest_description=new_description, enabled=1 if form.get("enabled") == "1" else 0, ) + _save_inline_watermarks(form, module_id) return RedirectResponse(url=f"/modules/{module_id}", status_code=303) @@ -469,7 +471,8 @@ async def wizard_create(request: Request): f"{dest_drv.quote_identifier(c['dest_name'])}" for c in chosen ) - source_query = f"SELECT\n {select_list}\nFROM {qualified_source}" + source_query_override = (form.get("source_query") or "").strip() + source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}" dest_schema, _, dest_table_bare = dest_table.partition(".") if not dest_table_bare: @@ -552,9 +555,47 @@ async def wizard_create(request: Request): columns=chosen, dest_description=dest_description, ) + _save_inline_watermarks(form, module["id"]) return RedirectResponse(url=f"/modules/{module['id']}", status_code=303) +def _save_inline_watermarks(form, module_id: int) -> None: + """Process wm_* arrays from a module form POST and persist watermarks. + + Deletes IDs listed in wm_deleted_id[], updates rows with a wm_id, and + creates new rows (no wm_id). Skips rows with a blank name. No-ops if the + form contains no wm_name fields at all (backwards-compat with old POSTs). + """ + wm_names = form.getlist("wm_name") + if not wm_names: + return + for wm_id_str in form.getlist("wm_deleted_id"): + if wm_id_str: + repo.delete_watermark(int(wm_id_str)) + wm_ids = form.getlist("wm_id") + wm_conns = form.getlist("wm_connection_id") + wm_sqls = form.getlist("wm_resolver_sql") + wm_defaults = form.getlist("wm_default_value") + for idx, name in enumerate(wm_names): + name = name.strip() + if not name: + continue + conn_id_str = wm_conns[idx] if idx < len(wm_conns) else "" + if not conn_id_str: + continue + conn_id = int(conn_id_str) + sql = wm_sqls[idx] if idx < len(wm_sqls) else "" + default_val = (wm_defaults[idx] if idx < len(wm_defaults) else "").strip() or None + wm_id_str = wm_ids[idx] if idx < len(wm_ids) else "" + if wm_id_str: + repo.update_watermark(int(wm_id_str), name=name, connection_id=conn_id, + resolver_sql=sql, default_value=default_val) + else: + repo.create_watermark(module_id=module_id, name=name, + connection_id=conn_id, resolver_sql=sql, + default_value=default_val) + + def _sql_str(v: str) -> str: """SQL string literal — PG-style single-quote escaping.""" return "'" + v.replace("'", "''") + "'" diff --git a/pipekit/web/templates/module_form.html b/pipekit/web/templates/module_form.html index 89c517a..9972fb2 100644 --- a/pipekit/web/templates/module_form.html +++ b/pipekit/web/templates/module_form.html @@ -19,7 +19,7 @@ {% endif %} -
+