drivers: make staging DDL dialect-aware; stop tracking pipekit.db

Add dialect-aware DDL hooks to the Driver base (create_schema_sql,
drop_table_if_exists_sql, create_like_table_sql, check_dest_table) and
implement DB2/MSSQL overrides so they can serve as merge destinations,
not just Postgres. runner.py now dispatches staging table creation
through the dest driver instead of hardcoding PG syntax.

Also untrack pipekit.db (runtime SQLite state) and add it to .gitignore.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-06-10 23:33:36 -04:00
parent f8490a2d4f
commit e7576926ae
8 changed files with 263 additions and 65 deletions

3
.gitignore vendored
View File

@ -2,7 +2,8 @@ __pycache__/
*.py[cod] *.py[cod]
*.egg-info/ *.egg-info/
# SQLite WAL/journal artifacts — db file itself is tracked for backup. # SQLite database and its WAL/journal artifacts — runtime state, not tracked.
pipekit.db
pipekit.db-journal pipekit.db-journal
pipekit.db-wal pipekit.db-wal
pipekit.db-shm pipekit.db-shm

Binary file not shown.

View File

@ -145,15 +145,42 @@ class Driver(abc.ABC):
def build_create_table_sql(self, qualified_table: str, def build_create_table_sql(self, qualified_table: str,
columns: list[dict]) -> str: columns: list[dict]) -> str:
"""Generate CREATE TABLE IF NOT EXISTS SQL for a destination table. """Generate CREATE TABLE SQL for a destination table.
``columns`` is a list of ``{dest_name, dest_type}`` dicts. ``columns`` is a list of ``{dest_name, dest_type}`` dicts.
Default implementation raises only destination drivers (PG today) Default raises destination drivers must implement this."""
need to implement it."""
raise NotImplementedError( raise NotImplementedError(
f"driver {self.kind!r} does not implement build_create_table_sql " f"driver {self.kind!r} does not implement build_create_table_sql "
"(not a supported destination)") "(not a supported destination)")
supports_table_comments: ClassVar[bool] = True
def create_schema_sql(self, schema: str) -> str:
"""DDL to create a schema if it doesn't already exist."""
return f"CREATE SCHEMA IF NOT EXISTS {self.quote_identifier(schema)};"
def drop_table_if_exists_sql(self, qualified_table: str) -> str:
"""DDL to drop a table only if it exists."""
return f"DROP TABLE IF EXISTS {qualified_table};"
def create_like_table_sql(self, new_table: str, source_table: str) -> str:
"""DDL to create new_table with the same columns as source_table."""
return f"CREATE TABLE {new_table} (LIKE {source_table} INCLUDING ALL);"
def check_dest_table(self, conn: dict, schema: str,
table: str) -> "set[str] | None":
"""Return lowercase column names of an existing dest table, or None
if the table does not exist. Default uses INFORMATION_SCHEMA (works
for PG and MSSQL); override for dialects that use a different catalog."""
sql = (
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema='{schema}' AND table_name='{table}'"
)
r = self.query(conn, sql)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
# ---- Shared helper ---- # ---- Shared helper ----
def query(self, conn: dict, sql: str) -> jrunner.QueryResult: def query(self, conn: dict, sql: str) -> jrunner.QueryResult:
"""Run `sql` in jrunner query mode against `conn`.""" """Run `sql` in jrunner query mode against `conn`."""

View File

@ -133,6 +133,68 @@ class DB2Driver(Driver):
f"THEN NULL ELSE {col} END") f"THEN NULL ELSE {col} END")
return col return col
def drop_table_if_exists_sql(self, qualified_table: str) -> str:
# DB2 for i doesn't support DROP TABLE IF EXISTS; check catalog first.
parts = qualified_table.strip('"').split(".", 1)
if len(parts) == 2:
schema_esc = parts[0].strip('"').replace("'", "''").upper()
table_esc = parts[1].strip('"').replace("'", "''").upper()
else:
schema_esc = ""
table_esc = parts[0].strip('"').replace("'", "''").upper()
where = f"TABLE_NAME='{table_esc}'"
if schema_esc:
where += f" AND TABLE_SCHEMA='{schema_esc}'"
return (
f"BEGIN\n"
f" IF EXISTS (SELECT 1 FROM QSYS2.SYSTABLES WHERE {where})\n"
f" THEN DROP TABLE {qualified_table};\n"
f" END IF;\n"
f"END"
)
def create_like_table_sql(self, new_table: str, source_table: str) -> str:
return f"CREATE TABLE {new_table} LIKE {source_table};"
def create_schema_sql(self, schema: str) -> str:
# DB2 for i: CREATE SCHEMA IF NOT EXISTS is available 7.4+; use the
# catalog check for broader compatibility.
name = schema.replace("'", "''")
q = self.quote_identifier(schema)
return (
f"BEGIN\n"
f" IF NOT EXISTS (SELECT 1 FROM QSYS2.SYSSCHEMAS WHERE SCHEMA_NAME='{name}')\n"
f" THEN CREATE SCHEMA {q};\n"
f" END IF;\n"
f"END"
)
def build_create_table_sql(self, qualified_table: str,
columns: list[dict]) -> str:
if not columns:
raise ValueError("no columns provided for CREATE TABLE")
lines = []
for c in columns:
name = c["dest_name"]
validate_identifier(name, "dest column name")
dtype = _normalize_db2_type((c.get("dest_type") or "").strip())
if not dtype:
raise ValueError(f"column {name!r} has no dest_type")
lines.append(f" {self.quote_identifier(name)} {dtype}")
body = ",\n".join(lines)
return f"CREATE TABLE {qualified_table} (\n{body}\n);"
def check_dest_table(self, conn: dict, schema: str,
table: str) -> "set[str] | None":
sql = (
f"SELECT COLUMN_NAME FROM QSYS2.SYSCOLUMNS "
f"WHERE TABLE_SCHEMA='{schema}' AND TABLE_NAME='{table}'"
)
r = self.query(conn, sql)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
def map_type(self, type_raw: str) -> str: def map_type(self, type_raw: str) -> str:
base = _base(type_raw) base = _base(type_raw)
mapped = _TYPE_MAP.get(base, "text") mapped = _TYPE_MAP.get(base, "text")
@ -141,6 +203,33 @@ class DB2Driver(Driver):
return mapped return mapped
_PG_TO_DB2: dict[str, str] = {
"text": "VARCHAR(32000)", "varchar": "VARCHAR(32000)",
"integer": "INTEGER", "int": "INTEGER",
"bigint": "BIGINT", "smallint": "SMALLINT",
"numeric": "NUMERIC", "decimal": "DECIMAL",
"real": "REAL", "double precision": "DOUBLE",
"boolean": "SMALLINT", "bool": "SMALLINT",
"date": "DATE",
"timestamp": "TIMESTAMP", "timestamptz": "TIMESTAMP",
"time": "TIME",
"bytea": "BLOB",
"uuid": "CHAR(36)",
"json": "CLOB", "jsonb": "CLOB",
}
def _normalize_db2_type(dest_type: str) -> str:
"""Map PG-style type names to DB2 for i equivalents; pass native types through."""
base = dest_type.lower().split("(")[0].strip()
native = _PG_TO_DB2.get(base)
if native:
if "(" in dest_type and base in ("numeric", "decimal"):
return base.upper() + dest_type[dest_type.index("("):]
return native
return dest_type or "VARCHAR(32000)"
def _format_type(dtype: str, length: str, prec: str, scale: str) -> str: def _format_type(dtype: str, length: str, prec: str, scale: str) -> str:
base = dtype.upper() base = dtype.upper()
if base in ("DECIMAL", "NUMERIC") and prec: if base in ("DECIMAL", "NUMERIC") and prec:

View File

@ -216,6 +216,31 @@ class MSSQLDriver(Driver):
return "numeric" + type_raw[type_raw.index("("):] return "numeric" + type_raw[type_raw.index("("):]
return mapped return mapped
supports_table_comments = False # MSSQL uses sp_addextendedproperty, not COMMENT ON
def create_schema_sql(self, schema: str) -> str:
name = schema.replace("'", "''")
q = self.quote_identifier(schema)
return f"IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name='{name}') EXEC('CREATE SCHEMA {q}');"
def create_like_table_sql(self, new_table: str, source_table: str) -> str:
return f"SELECT TOP 0 * INTO {new_table} FROM {source_table};"
def build_create_table_sql(self, qualified_table: str,
columns: list[dict]) -> str:
if not columns:
raise ValueError("no columns provided for CREATE TABLE")
lines = []
for c in columns:
name = c["dest_name"]
validate_identifier(name, "dest column name")
dtype = _normalize_mssql_type((c.get("dest_type") or "").strip())
if not dtype:
raise ValueError(f"column {name!r} has no dest_type")
lines.append(f" {self.quote_identifier(name)} {dtype}")
body = ",\n".join(lines)
return f"CREATE TABLE {qualified_table} (\n{body}\n);"
# ---- helpers ---- # ---- helpers ----
def _validate(self, linked_server, database, schema): def _validate(self, linked_server, database, schema):
if linked_server: if linked_server:
@ -233,6 +258,33 @@ class MSSQLDriver(Driver):
return "" return ""
_PG_TO_MSSQL: dict[str, str] = {
"text": "NVARCHAR(MAX)", "varchar": "NVARCHAR(MAX)",
"integer": "INT", "int": "INT",
"bigint": "BIGINT", "smallint": "SMALLINT",
"numeric": "NUMERIC", "decimal": "DECIMAL",
"real": "REAL", "double precision": "FLOAT",
"boolean": "BIT", "bool": "BIT",
"date": "DATE",
"timestamp": "DATETIME2", "timestamptz": "DATETIMEOFFSET",
"time": "TIME",
"bytea": "VARBINARY(MAX)",
"uuid": "UNIQUEIDENTIFIER",
"json": "NVARCHAR(MAX)", "jsonb": "NVARCHAR(MAX)",
}
def _normalize_mssql_type(dest_type: str) -> str:
"""Map PG-style type names to MSSQL equivalents; pass native types through."""
base = dest_type.lower().split("(")[0].strip()
native = _PG_TO_MSSQL.get(base)
if native:
if "(" in dest_type and base in ("numeric", "decimal"):
return base.upper() + dest_type[dest_type.index("("):]
return native
return dest_type or "NVARCHAR(MAX)"
def _format_type(dtype: str, length: str, prec: str, scale: str) -> str: def _format_type(dtype: str, length: str, prec: str, scale: str) -> str:
base = dtype.upper() base = dtype.upper()
if base in ("DECIMAL", "NUMERIC") and prec: if base in ("DECIMAL", "NUMERIC") and prec:

View File

@ -19,7 +19,7 @@ import os
import traceback import traceback
from dataclasses import dataclass from dataclasses import dataclass
from .. import jrunner, repo from .. import drivers, jrunner, repo
from . import merge, watermark from . import merge, watermark
@ -77,6 +77,8 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
dest_conn = repo.get_connection(module["dest_connection_id"]) dest_conn = repo.get_connection(module["dest_connection_id"])
if source_conn is None or dest_conn is None: if source_conn is None or dest_conn is None:
raise ValueError("source or dest connection missing") raise ValueError("source or dest connection missing")
dest_drv_row = repo.get_driver_row(dest_conn["driver_id"])
dest_drv = drivers.get_driver(dest_drv_row["kind"]) if dest_drv_row else None
# 23. watermarks + materialised source query # 23. watermarks + materialised source query
wm_values = watermark.resolve_watermarks(module, use_defaults_only=False) wm_values = watermark.resolve_watermarks(module, use_defaults_only=False)
@ -103,16 +105,17 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
# self-healing. Staging is ephemeral per SPEC; nothing of value lives # self-healing. Staging is ephemeral per SPEC; nothing of value lives
# between runs. # between runs.
staging_schema, _, _ = module["staging_table"].partition(".") staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]: if staging_schema and staging_schema != module["staging_table"] and dest_drv:
jrunner.run_dest_sql( jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};") dest_conn, dest_drv.create_schema_sql(staging_schema))
jrunner.run_dest_sql( drop_sql = (dest_drv.drop_table_if_exists_sql(module["staging_table"])
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") if dest_drv else f"DROP TABLE IF EXISTS {module['staging_table']};")
jrunner.run_dest_sql( like_sql = (dest_drv.create_like_table_sql(module["staging_table"], module["dest_table"])
dest_conn, if dest_drv else
f"CREATE TABLE {module['staging_table']} " f"CREATE TABLE {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);", f"(LIKE {module['dest_table']} INCLUDING ALL);")
) jrunner.run_dest_sql(dest_conn, drop_sql)
jrunner.run_dest_sql(dest_conn, like_sql)
# 5. migrate source → staging. jrunner does its own `DELETE FROM staging` # 5. migrate source → staging. jrunner does its own `DELETE FROM staging`
# before loading, so we don't need a separate TRUNCATE. # before loading, so we don't need a separate TRUNCATE.

View File

@ -259,10 +259,15 @@ async def module_update(request: Request, module_id: int):
# fresh from dest on next run). Re-apply dest COMMENT if it changed. # fresh from dest on next run). Re-apply dest COMMENT if it changed.
dest_conn = repo.get_connection(module["dest_connection_id"]) dest_conn = repo.get_connection(module["dest_connection_id"])
if dest_conn is not None: if dest_conn is not None:
dest_drv_row = repo.get_driver_row(dest_conn["driver_id"])
_dest_drv = (drivers.get_driver(dest_drv_row["kind"])
if dest_drv_row else None)
try: try:
jrunner.run_dest_sql( drop_sql = (_dest_drv.drop_table_if_exists_sql(module["staging_table"])
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") if _dest_drv else f"DROP TABLE IF EXISTS {module['staging_table']};")
if new_description != module["dest_description"]: jrunner.run_dest_sql(dest_conn, drop_sql)
if new_description != module["dest_description"] and (
_dest_drv is None or _dest_drv.supports_table_comments):
jrunner.run_dest_sql( jrunner.run_dest_sql(
dest_conn, dest_conn,
f"COMMENT ON TABLE {module['dest_table']} IS " f"COMMENT ON TABLE {module['dest_table']} IS "
@ -494,7 +499,7 @@ def wizard_step3(request: Request,
drivers_by_id = {d["id"]: d for d in repo.list_drivers()} drivers_by_id = {d["id"]: d for d in repo.list_drivers()}
dest_conns = [ dest_conns = [
c for c in repo.list_connections() c for c in repo.list_connections()
if drivers_by_id.get(c["driver_id"], {}).get("kind") == "pg" if drivers_by_id.get(c["driver_id"]) is not None
] ]
qualified = drv.qualified_table_name(table, **qvals) if not fetch_error else table qualified = drv.qualified_table_name(table, **qvals) if not fetch_error else table
default_module_name = (table_schema + "_" + table).lower() if table_schema else table.lower() default_module_name = (table_schema + "_" + table).lower() if table_schema else table.lower()
@ -507,8 +512,12 @@ def wizard_step3(request: Request,
if not fetch_error and default_dest_conn_id and default_dest_schema: if not fetch_error and default_dest_conn_id and default_dest_schema:
dest_conn_row = repo.get_connection(default_dest_conn_id) dest_conn_row = repo.get_connection(default_dest_conn_id)
if dest_conn_row is not None: if dest_conn_row is not None:
dest_drv_row = repo.get_driver_row(dest_conn_row["driver_id"])
default_dest_drv = (drivers.get_driver(dest_drv_row["kind"])
if dest_drv_row else None)
if default_dest_drv is not None:
try: try:
existing = _existing_dest_columns( existing = default_dest_drv.check_dest_table(
dest_conn_row, default_dest_schema, default_module_name) dest_conn_row, default_dest_schema, default_module_name)
except jrunner.JrunnerError: except jrunner.JrunnerError:
existing = None existing = None
@ -619,8 +628,7 @@ async def wizard_create(request: Request):
# If the dest table already exists, don't clobber it. Verify the picks # If the dest table already exists, don't clobber it. Verify the picks
# match its shape and skip CREATE/COMMENT. # match its shape and skip CREATE/COMMENT.
try: try:
existing_cols = _existing_dest_columns( existing_cols = dest_drv.check_dest_table(dest_conn, dest_schema, dest_table_bare)
dest_conn, dest_schema, dest_table_bare)
except jrunner.JrunnerError as e: except jrunner.JrunnerError as e:
raise HTTPException(500, f"could not introspect dest: {e}") raise HTTPException(500, f"could not introspect dest: {e}")
dest_exists = existing_cols is not None dest_exists = existing_cols is not None
@ -647,29 +655,19 @@ async def wizard_create(request: Request):
) )
try: try:
jrunner.run_dest_sql( jrunner.run_dest_sql(dest_conn, dest_drv.create_schema_sql(dest_schema))
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};",
)
if not dest_exists: if not dest_exists:
jrunner.run_dest_sql(dest_conn, create_table_sql) jrunner.run_dest_sql(dest_conn, create_table_sql)
if dest_drv.supports_table_comments:
comment_sql = _build_comment_sql(dest_drv, qualified_dest, comment_sql = _build_comment_sql(dest_drv, qualified_dest,
dest_description, chosen) dest_description, chosen)
if comment_sql: if comment_sql:
jrunner.run_dest_sql(dest_conn, comment_sql) jrunner.run_dest_sql(dest_conn, comment_sql)
# Pre-align staging to dest so first run doesn't surprise us. # Pre-align staging to dest so first run doesn't surprise us.
if staging_schema and staging_schema != effective_staging: if staging_schema and staging_schema != effective_staging:
jrunner.run_dest_sql( jrunner.run_dest_sql(dest_conn, dest_drv.create_schema_sql(staging_schema))
dest_conn, jrunner.run_dest_sql(dest_conn, dest_drv.drop_table_if_exists_sql(effective_staging))
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};", jrunner.run_dest_sql(dest_conn, dest_drv.create_like_table_sql(effective_staging, qualified_dest))
)
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE {effective_staging} "
f"(LIKE {qualified_dest} INCLUDING ALL);",
)
except jrunner.JrunnerError as e: except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest provisioning failed: {e}") raise HTTPException(500, f"dest provisioning failed: {e}")
@ -759,20 +757,6 @@ def _sql_str(v: str) -> str:
return "'" + v.replace("'", "''") + "'" return "'" + v.replace("'", "''") + "'"
def _existing_dest_columns(dest_conn: dict, schema: str,
table: str) -> set[str] | None:
"""Return lowercase column names of an existing PG dest table, or
None if it doesn't exist. PG-only; fine while pg is the sole dest."""
r = jrunner.run_dest_sql(
dest_conn,
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema={_sql_str(schema)} "
f"AND table_name={_sql_str(table)}",
)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
def _build_comment_sql(dest_drv, qualified_dest: str, def _build_comment_sql(dest_drv, qualified_dest: str,
table_description: str | None, table_description: str | None,

View File

@ -66,11 +66,31 @@
<div class="panel" style="margin-top:1rem"> <div class="panel" style="margin-top:1rem">
<header> <header>
Schedules Schedules
<span class="subtitle">cron expressions in local time — e.g. <code>0 4 * * *</code> for daily at 04:00</span> <span class="subtitle">cron expression reference</span>
<button type="button" class="btn ghost" style="margin-left:auto" <button type="button" class="btn ghost" style="margin-left:auto"
onclick="addScheduleRow()">+ add</button> onclick="addScheduleRow()">+ add</button>
</header> </header>
<div class="body tight"> <div class="body tight">
<table class="grid" style="margin-bottom:0.6rem">
<thead>
<tr>
<th>pos 1</th>
<th>pos 2</th>
<th>pos 3</th>
<th>pos 4</th>
<th>pos 5</th>
</tr>
</thead>
<tbody>
<tr>
<td>minute<br>059</td>
<td>hour<br>023</td>
<td>day of month<br>131</td>
<td>month<br>112</td>
<td>day of week<br>0=Sun, 6=Sat</td>
</tr>
</tbody>
</table>
<table class="grid" id="sched-table"> <table class="grid" id="sched-table">
<thead> <thead>
<tr> <tr>
@ -108,6 +128,28 @@
{% endif %} {% endif %}
<details style="margin-top:0.8rem"> <details style="margin-top:0.8rem">
<summary style="cursor:pointer;color:var(--text-muted);font-size:0.85em">cron expression reference</summary> <summary style="cursor:pointer;color:var(--text-muted);font-size:0.85em">cron expression reference</summary>
<div style="margin-top:0.5rem;font-size:0.85em">
<table class="grid" style="font-size:1em;width:auto">
<thead>
<tr>
<th class="mono" style="text-align:center">pos 1</th>
<th class="mono" style="text-align:center">pos 2</th>
<th class="mono" style="text-align:center">pos 3</th>
<th class="mono" style="text-align:center">pos 4</th>
<th class="mono" style="text-align:center">pos 5</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:center">minute<br><span style="color:var(--text-muted);font-size:0.9em">059</span></td>
<td style="text-align:center">hour<br><span style="color:var(--text-muted);font-size:0.9em">023</span></td>
<td style="text-align:center">day of month<br><span style="color:var(--text-muted);font-size:0.9em">131</span></td>
<td style="text-align:center">month<br><span style="color:var(--text-muted);font-size:0.9em">112</span></td>
<td style="text-align:center">day of week<br><span style="color:var(--text-muted);font-size:0.9em">0=Sun, 6=Sat</span></td>
</tr>
</tbody>
</table>
</div>
<table class="grid" style="margin-top:0.5rem;font-size:0.85em"> <table class="grid" style="margin-top:0.5rem;font-size:0.85em">
<thead><tr><th>expression</th><th>meaning</th></tr></thead> <thead><tr><th>expression</th><th>meaning</th></tr></thead>
<tbody> <tbody>
@ -122,7 +164,7 @@
</tbody> </tbody>
</table> </table>
<div style="margin-top:0.4rem;color:var(--text-muted);font-size:0.82em"> <div style="margin-top:0.4rem;color:var(--text-muted);font-size:0.82em">
Format: <code>minute hour day-of-month month day-of-week</code> &nbsp;·&nbsp; all times local all times local &nbsp;·&nbsp; use <code>*</code> for "any", <code>*/n</code> for "every n", <code>a-b</code> for range, <code>a,b</code> for list
</div> </div>
</details> </details>
</div> </div>