Unify incremental sync config: inline watermarks + editable source query

Watermarks, merge strategy, merge key, and source query are now edited
together in one form on both the module edit page and wizard step 3.
A client-side placeholder warning fires when {name} tokens in the query
don't match the watermark rows on the page. The wizard now shows an
editable source query textarea pre-populated from column picks so WHERE
clauses can be added before module creation. Watermarks submitted via
wm_* arrays are processed by _save_inline_watermarks() in both
module_update and wizard_create.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-05-18 21:31:55 -04:00
parent 99f75490c4
commit 595024eb52
5 changed files with 349 additions and 27 deletions

View File

@ -82,6 +82,10 @@ Each driver in `pipekit/drivers/` inherits from `base.py::Driver` and implements
Modules store their column mapping as `columns_json` — a JSON list of dicts with keys `source_name`, `source_type`, `dest_name`, `dest_type`. The engine uses this to build the staging CREATE TABLE and the merge INSERT column lists.
## Inline Watermark Editing
Watermarks are managed inline on both the module edit form and wizard step 3 (not just via the standalone `/watermarks/{id}/edit` page). The module edit form (`module_form.html`) renders existing watermarks as editable rows and submits them as parallel arrays (`wm_id[]`, `wm_name[]`, `wm_connection_id[]`, `wm_resolver_sql[]`, `wm_default_value[]`, `wm_deleted_id[]`). The `_save_inline_watermarks(form, module_id)` helper in `web/app.py` processes these arrays — updates existing rows, creates new ones, deletes removed ones. Both `module_update` and `wizard_create` call it after saving the module. A client-side placeholder warning checks that `{name}` tokens in the source query match the watermark names on the page.
## Merge Key
`merge_key` is stored as a comma-separated string (e.g., `"col1, col2"`). The engine parses it and generates a multi-column DELETE predicate for incremental strategy.
@ -101,7 +105,7 @@ Recreated on every run as `pipekit_staging.{module_name}` (DROP + CREATE, not IF
- Python 3.10+, FastAPI, Uvicorn, Jinja2, PyYAML, SQLite3 (stdlib)
- `python-multipart` required for HTML form POSTs (not auto-installed as a FastAPI transitive dep)
- Frontend: HTMX + Alpine.js (CDN), no build step
- Frontend: HTMX (CDN) + vanilla JS; Alpine.js is NOT loaded despite being listed in older docs
- jrunner: separate Java tool, must be on PATH
## Full Spec

12
SPEC.md
View File

@ -366,8 +366,12 @@ Most of the time you accept defaults.
Pick dest connection. Dest table defaults to
`{source_conn.default_dest_schema}.{lowercase_source_table_name}`. Pick
merge strategy. Pick merge key from a dropdown of dest column names. Add
zero or more watermarks via a sub-form.
merge strategy. Pick merge key. When strategy is `incremental`, a
**Watermarks** panel and an editable **Source query** textarea appear
inline. The source query is pre-populated from column picks; the user
edits it to add the WHERE clause with `{placeholder}` references before
creating the module. Zero or more watermarks can be added in step 3;
they are created atomically with the module on submit.
**Multiple destinations are real** (e.g. PG → SQL Server). The wizard
doesn't assume one dest. Each source connection has a
@ -436,8 +440,8 @@ Defaults are **opinions hardcoded in driver modules** for now. Lift to a
### Wizard scope (what it does NOT do)
- **No CTE-based queries.** Wizard generates simple `SELECT cols FROM table WHERE watermark`. For complex queries (like `ffsbglr1`), create with the wizard and edit the source query post-creation via `e`.
- **No multi-watermark wizard.** Single watermark. Add more after.
- **No CTE-based queries.** Wizard generates a simple `SELECT cols FROM table` skeleton; the user adds WHERE clauses (including watermark placeholders) in the editable source query textarea in step 3, or post-creation via the module edit form.
- **Multi-watermark supported in wizard.** Add as many watermarks as needed in step 3; they're created atomically with the module.
- **No hooks in the wizard.** Add hooks from the module detail screen.
- **No group assignment in the wizard.** Assign separately.

View File

@ -141,6 +141,7 @@ def module_edit(request: Request, module_id: int):
request,
"module_form.html",
_ctx(module=module, connections=repo.list_connections(),
watermarks=repo.list_watermarks(module_id),
form_action=f"/modules/{module_id}",
cancel_url=f"/modules/{module_id}"),
)
@ -195,6 +196,7 @@ async def module_update(request: Request, module_id: int):
dest_description=new_description,
enabled=1 if form.get("enabled") == "1" else 0,
)
_save_inline_watermarks(form, module_id)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@ -469,7 +471,8 @@ async def wizard_create(request: Request):
f"{dest_drv.quote_identifier(c['dest_name'])}"
for c in chosen
)
source_query = f"SELECT\n {select_list}\nFROM {qualified_source}"
source_query_override = (form.get("source_query") or "").strip()
source_query = source_query_override or f"SELECT\n {select_list}\nFROM {qualified_source}"
dest_schema, _, dest_table_bare = dest_table.partition(".")
if not dest_table_bare:
@ -552,9 +555,47 @@ async def wizard_create(request: Request):
columns=chosen,
dest_description=dest_description,
)
_save_inline_watermarks(form, module["id"])
return RedirectResponse(url=f"/modules/{module['id']}", status_code=303)
def _save_inline_watermarks(form, module_id: int) -> None:
"""Process wm_* arrays from a module form POST and persist watermarks.
Deletes IDs listed in wm_deleted_id[], updates rows with a wm_id, and
creates new rows (no wm_id). Skips rows with a blank name. No-ops if the
form contains no wm_name fields at all (backwards-compat with old POSTs).
"""
wm_names = form.getlist("wm_name")
if not wm_names:
return
for wm_id_str in form.getlist("wm_deleted_id"):
if wm_id_str:
repo.delete_watermark(int(wm_id_str))
wm_ids = form.getlist("wm_id")
wm_conns = form.getlist("wm_connection_id")
wm_sqls = form.getlist("wm_resolver_sql")
wm_defaults = form.getlist("wm_default_value")
for idx, name in enumerate(wm_names):
name = name.strip()
if not name:
continue
conn_id_str = wm_conns[idx] if idx < len(wm_conns) else ""
if not conn_id_str:
continue
conn_id = int(conn_id_str)
sql = wm_sqls[idx] if idx < len(wm_sqls) else ""
default_val = (wm_defaults[idx] if idx < len(wm_defaults) else "").strip() or None
wm_id_str = wm_ids[idx] if idx < len(wm_ids) else ""
if wm_id_str:
repo.update_watermark(int(wm_id_str), name=name, connection_id=conn_id,
resolver_sql=sql, default_value=default_val)
else:
repo.create_watermark(module_id=module_id, name=name,
connection_id=conn_id, resolver_sql=sql,
default_value=default_val)
def _sql_str(v: str) -> str:
"""SQL string literal — PG-style single-quote escaping."""
return "'" + v.replace("'", "''") + "'"

View File

@ -19,7 +19,7 @@
</div>
{% endif %}
<form method="post" action="{{ form_action }}">
<form method="post" action="{{ form_action }}" id="module-form">
<label class="field">
<span>name</span>
<input type="text" name="name" required value="{{ module.name }}">
@ -70,29 +70,92 @@
<label class="field">
<span>source query</span>
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
<textarea name="source_query" id="source-query" rows="14" required class="mono"
oninput="checkPlaceholders()">{{ module.source_query }}</textarea>
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
</label>
<div id="placeholder-warning" class="flash warn" style="display:none;margin-top:0.3rem"></div>
<div class="two-col" style="gap:1rem">
<div class="panel" style="margin-top:1rem">
<header>Merge &amp; watermarks</header>
<div class="body">
<div class="two-col" style="gap:1rem;margin-bottom:0.8rem">
<label class="field">
<span>merge strategy</span>
<select name="merge_strategy" required>
<select name="merge_strategy" id="merge-strategy" required
onchange="onStrategyChange(this.value)">
{% for s in ("full", "incremental", "append") %}
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<label class="field" id="merge-key-field"
style="{{ '' if module.merge_strategy == 'incremental' else 'display:none' }}">
<span>merge key</span>
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
placeholder="id or (col_a, col_b)">
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
placeholder="id or id,version">
<span class="help">column name(s) for the DELETE predicate</span>
</label>
</div>
<label class="field">
<div id="watermarks-section"
style="{{ '' if module.merge_strategy == 'incremental' else 'display:none' }}">
<div style="display:flex;align-items:center;gap:0.5rem;margin-bottom:0.5rem">
<strong>Watermarks</strong>
<span class="help" style="margin:0">— resolver SQL runs before each sync; result substituted as <code>{name}</code> in source query</span>
<button type="button" class="btn ghost" style="margin-left:auto"
onclick="addWatermarkRow()">+ add</button>
</div>
<table class="grid" id="wm-table">
<thead>
<tr>
<th>name</th>
<th>resolver connection</th>
<th>resolver SQL</th>
<th>default value</th>
<th></th>
</tr>
</thead>
<tbody id="wm-tbody">
{% for w in watermarks %}
<tr>
<input type="hidden" name="wm_id" value="{{ w.id }}">
<td><input type="text" name="wm_name" value="{{ w.name }}"
class="mono" style="width:100%" oninput="checkPlaceholders()"></td>
<td>
<select name="wm_connection_id" style="width:100%">
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == w.connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
</td>
<td><textarea name="wm_resolver_sql" rows="2" class="mono"
style="width:100%;min-width:16rem">{{ w.resolver_sql }}</textarea></td>
<td><input type="text" name="wm_default_value"
value="{{ w.default_value or '' }}"
placeholder="(fail if null)"
style="width:100%"></td>
<td>
<button type="button" class="ghost"
style="color:var(--danger);border:none"
onclick="removeWatermarkRow(this, '{{ w.id }}')">remove</button>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if not watermarks %}
<div id="wm-empty" class="empty" style="margin-top:0.4rem">No watermarks — add one to enable incremental filtering.</div>
{% endif %}
</div>
</div>
</div>
<label class="field" style="margin-top:1rem">
<span>dest description</span>
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
@ -112,4 +175,81 @@
</form>
</div>
</div>
<template id="wm-row-template">
<tr>
<input type="hidden" name="wm_id" value="">
<td><input type="text" name="wm_name" value="" class="mono" style="width:100%"
oninput="checkPlaceholders()"></td>
<td>
<select name="wm_connection_id" style="width:100%">
{% for c in connections %}
<option value="{{ c.id }}">{{ c.name }}</option>
{% endfor %}
</select>
</td>
<td><textarea name="wm_resolver_sql" rows="2" class="mono"
style="width:100%;min-width:16rem"></textarea></td>
<td><input type="text" name="wm_default_value" placeholder="(fail if null)"
style="width:100%"></td>
<td>
<button type="button" class="ghost" style="color:var(--danger);border:none"
onclick="removeWatermarkRow(this, '')">remove</button>
</td>
</tr>
</template>
<script>
function onStrategyChange(val) {
document.getElementById('merge-key-field').style.display = val === 'incremental' ? '' : 'none';
document.getElementById('watermarks-section').style.display = val === 'incremental' ? '' : 'none';
checkPlaceholders();
}
function addWatermarkRow() {
const tmpl = document.getElementById('wm-row-template');
const row = tmpl.content.cloneNode(true);
document.getElementById('wm-tbody').appendChild(row);
const empty = document.getElementById('wm-empty');
if (empty) empty.style.display = 'none';
}
function removeWatermarkRow(btn, wmId) {
if (wmId) {
const input = document.createElement('input');
input.type = 'hidden';
input.name = 'wm_deleted_id';
input.value = wmId;
document.getElementById('module-form').appendChild(input);
}
btn.closest('tr').remove();
checkPlaceholders();
}
function checkPlaceholders() {
const strategy = document.getElementById('merge-strategy').value;
const warn = document.getElementById('placeholder-warning');
if (strategy !== 'incremental') { warn.style.display = 'none'; return; }
const query = document.getElementById('source-query').value;
const inQuery = new Set([...query.matchAll(/\{(\w+)\}/g)].map(m => m[1]));
const inWm = new Set([...document.querySelectorAll('#wm-tbody [name="wm_name"]')]
.map(el => el.value.trim()).filter(Boolean));
const noWm = [...inQuery].filter(n => !inWm.has(n));
const noPlaceholder = [...inWm].filter(n => !inQuery.has(n));
const msgs = [];
if (noWm.length) msgs.push('Placeholder(s) in query with no watermark: ' + noWm.map(n => '{'+n+'}').join(', '));
if (noPlaceholder.length) msgs.push('Watermark(s) not used in query: ' + noPlaceholder.map(n => '{'+n+'}').join(', '));
if (msgs.length) {
warn.textContent = msgs.join(' · ');
warn.style.display = '';
} else {
warn.style.display = 'none';
}
}
document.addEventListener('DOMContentLoaded', checkPlaceholders);
</script>
{% endblock %}

View File

@ -62,10 +62,11 @@
</thead>
<tbody>
{% for c in columns %}
<tr onclick="var cb=document.getElementById('col-{{ loop.index }}'); if(event.target.tagName!=='INPUT') cb.checked=!cb.checked">
<tr onclick="var cb=document.getElementById('col-{{ loop.index }}'); if(event.target.tagName!=='INPUT') cb.checked=!cb.checked; fillQuery()">
<td class="pick">
<input type="checkbox" id="col-{{ loop.index }}"
class="col-check" name="col" value="{{ c.name }}" checked>
class="col-check" name="col" value="{{ c.name }}" checked
onchange="fillQuery()">
</td>
<td class="mono">{{ c.position }}</td>
<td class="mono">{{ c.name }}</td>
@ -150,7 +151,7 @@
<label class="field">
<span>strategy</span>
<select name="merge_strategy" id="merge_strategy"
onchange="document.getElementById('mkf').style.display = this.value==='incremental' ? '' : 'none'">
onchange="onStrategyChange(this.value)">
<option value="full">full (truncate + insert)</option>
<option value="incremental">incremental (delete by key + insert)</option>
<option value="append">append (insert only)</option>
@ -164,6 +165,46 @@
</div>
</div>
<div class="panel" id="wm-panel" style="display:none">
<header>
Watermarks
<button type="button" class="btn ghost" style="margin-left:auto"
onclick="addWatermarkRow()">+ add</button>
</header>
<div class="body">
<p class="help" style="margin-bottom:0.6rem">
Add a watermark for each <code>{placeholder}</code> you plan to use in the source query's WHERE clause.
Edit the query below to add the filter after choosing columns.
</p>
<table class="grid" id="wm-table" style="display:none">
<thead>
<tr>
<th>name</th>
<th>resolver connection</th>
<th>resolver SQL</th>
<th>default value</th>
<th></th>
</tr>
</thead>
<tbody id="wm-tbody"></tbody>
</table>
<div id="wm-empty" class="empty">No watermarks added yet.</div>
</div>
</div>
<div class="panel" id="query-panel" style="display:none">
<header>Source query
<span class="subtitle">auto-generated from picks — edit to add WHERE clause</span>
</header>
<div class="body">
<textarea name="source_query" id="source-query" rows="10" class="mono"
style="width:100%" oninput="checkPlaceholders()"
placeholder="Leave blank to auto-generate from column picks"></textarea>
<div id="placeholder-warning" class="flash warn" style="display:none;margin-top:0.4rem"></div>
<span class="help">Leave blank to auto-generate. Add <code>WHERE col &gt; {name}</code> for incremental filtering.</span>
</div>
</div>
<div class="panel">
<header>Create</header>
<div class="body" style="display:flex;justify-content:flex-end;gap:0.5rem">
@ -175,9 +216,101 @@
</div>
</form>
<template id="wm-row-template">
<tr>
<input type="hidden" name="wm_id" value="">
<td><input type="text" name="wm_name" value="" class="mono" style="width:100%"
oninput="checkPlaceholders()"></td>
<td>
<select name="wm_connection_id" style="width:100%">
{% for c in all_connections %}
<option value="{{ c.id }}">{{ c.name }}</option>
{% endfor %}
</select>
</td>
<td><textarea name="wm_resolver_sql" rows="2" class="mono"
style="width:100%;min-width:12rem"></textarea></td>
<td><input type="text" name="wm_default_value" placeholder="(fail if null)"
style="width:6rem"></td>
<td>
<button type="button" class="ghost" style="color:var(--danger);border:none"
onclick="removeWatermarkRow(this)">remove</button>
</td>
</tr>
</template>
<script>
const QUALIFIED_TABLE = {{ qualified_table | tojson }};
function toggleAll(val) {
document.querySelectorAll('.col-check').forEach(function (cb) { cb.checked = val; });
fillQuery();
}
function onStrategyChange(val) {
document.getElementById('mkf').style.display = val === 'incremental' ? '' : 'none';
document.getElementById('wm-panel').style.display = val === 'incremental' ? '' : 'none';
document.getElementById('query-panel').style.display = val === 'incremental' ? '' : 'none';
if (val === 'incremental') fillQuery();
checkPlaceholders();
}
function fillQuery() {
const ta = document.getElementById('source-query');
if (!ta) return;
const checked = [...document.querySelectorAll('.col-check:checked')];
if (!checked.length) { ta.value = ''; return; }
const cols = checked.map(cb => {
const name = cb.value;
const destInput = document.querySelector('[name="dest_name__' + name + '"]');
const dest = destInput ? destInput.value.trim() : name.toLowerCase();
return ' ' + name + ' AS ' + dest;
});
ta.value = 'SELECT\n' + cols.join(',\n') + '\nFROM ' + QUALIFIED_TABLE;
checkPlaceholders();
}
function addWatermarkRow() {
const tmpl = document.getElementById('wm-row-template');
const row = tmpl.content.cloneNode(true);
document.getElementById('wm-tbody').appendChild(row);
document.getElementById('wm-table').style.display = '';
document.getElementById('wm-empty').style.display = 'none';
}
function removeWatermarkRow(btn) {
btn.closest('tr').remove();
const tbody = document.getElementById('wm-tbody');
if (!tbody.querySelector('tr')) {
document.getElementById('wm-table').style.display = 'none';
document.getElementById('wm-empty').style.display = '';
}
checkPlaceholders();
}
function checkPlaceholders() {
const strategy = document.getElementById('merge_strategy').value;
const warn = document.getElementById('placeholder-warning');
if (!warn) return;
if (strategy !== 'incremental') { warn.style.display = 'none'; return; }
const query = (document.getElementById('source-query') || {}).value || '';
const inQuery = new Set([...query.matchAll(/\{(\w+)\}/g)].map(m => m[1]));
const inWm = new Set([...document.querySelectorAll('#wm-tbody [name="wm_name"]')]
.map(el => el.value.trim()).filter(Boolean));
const noWm = [...inQuery].filter(n => !inWm.has(n));
const noPlaceholder = [...inWm].filter(n => !inQuery.has(n));
const msgs = [];
if (noWm.length) msgs.push('Placeholder(s) in query with no watermark: ' + noWm.map(n => '{'+n+'}').join(', '));
if (noPlaceholder.length) msgs.push('Watermark(s) not used in query: ' + noPlaceholder.map(n => '{'+n+'}').join(', '));
if (msgs.length) {
warn.textContent = msgs.join(' · ');
warn.style.display = '';
} else {
warn.style.display = 'none';
}
}
</script>
{% endif %}