diff --git a/.gitignore b/.gitignore index 526ae32..88270fe 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,8 @@ __pycache__/ *.py[cod] *.egg-info/ -# SQLite WAL/journal artifacts — db file itself is tracked for backup. +# SQLite database and its WAL/journal artifacts — runtime state, not tracked. +pipekit.db pipekit.db-journal pipekit.db-wal pipekit.db-shm diff --git a/pipekit.db b/pipekit.db deleted file mode 100644 index 733b678..0000000 Binary files a/pipekit.db and /dev/null differ diff --git a/pipekit/drivers/base.py b/pipekit/drivers/base.py index d9f1ae7..8e1ac27 100644 --- a/pipekit/drivers/base.py +++ b/pipekit/drivers/base.py @@ -145,15 +145,42 @@ class Driver(abc.ABC): def build_create_table_sql(self, qualified_table: str, columns: list[dict]) -> str: - """Generate CREATE TABLE IF NOT EXISTS SQL for a destination table. + """Generate CREATE TABLE SQL for a destination table. ``columns`` is a list of ``{dest_name, dest_type}`` dicts. - Default implementation raises — only destination drivers (PG today) - need to implement it.""" + Default raises — destination drivers must implement this.""" raise NotImplementedError( f"driver {self.kind!r} does not implement build_create_table_sql " "(not a supported destination)") + supports_table_comments: ClassVar[bool] = True + + def create_schema_sql(self, schema: str) -> str: + """DDL to create a schema if it doesn't already exist.""" + return f"CREATE SCHEMA IF NOT EXISTS {self.quote_identifier(schema)};" + + def drop_table_if_exists_sql(self, qualified_table: str) -> str: + """DDL to drop a table only if it exists.""" + return f"DROP TABLE IF EXISTS {qualified_table};" + + def create_like_table_sql(self, new_table: str, source_table: str) -> str: + """DDL to create new_table with the same columns as source_table.""" + return f"CREATE TABLE {new_table} (LIKE {source_table} INCLUDING ALL);" + + def check_dest_table(self, conn: dict, schema: str, + table: str) -> "set[str] | None": + """Return lowercase column names of an existing dest table, or None + if the table does not exist. Default uses INFORMATION_SCHEMA (works + for PG and MSSQL); override for dialects that use a different catalog.""" + sql = ( + f"SELECT column_name FROM information_schema.columns " + f"WHERE table_schema='{schema}' AND table_name='{table}'" + ) + r = self.query(conn, sql) + if not r.rows: + return None + return {row[0].strip().lower() for row in r.rows if row and row[0]} + # ---- Shared helper ---- def query(self, conn: dict, sql: str) -> jrunner.QueryResult: """Run `sql` in jrunner query mode against `conn`.""" diff --git a/pipekit/drivers/db2.py b/pipekit/drivers/db2.py index 6a135a0..866203f 100644 --- a/pipekit/drivers/db2.py +++ b/pipekit/drivers/db2.py @@ -133,6 +133,68 @@ class DB2Driver(Driver): f"THEN NULL ELSE {col} END") return col + def drop_table_if_exists_sql(self, qualified_table: str) -> str: + # DB2 for i doesn't support DROP TABLE IF EXISTS; check catalog first. + parts = qualified_table.strip('"').split(".", 1) + if len(parts) == 2: + schema_esc = parts[0].strip('"').replace("'", "''").upper() + table_esc = parts[1].strip('"').replace("'", "''").upper() + else: + schema_esc = "" + table_esc = parts[0].strip('"').replace("'", "''").upper() + where = f"TABLE_NAME='{table_esc}'" + if schema_esc: + where += f" AND TABLE_SCHEMA='{schema_esc}'" + return ( + f"BEGIN\n" + f" IF EXISTS (SELECT 1 FROM QSYS2.SYSTABLES WHERE {where})\n" + f" THEN DROP TABLE {qualified_table};\n" + f" END IF;\n" + f"END" + ) + + def create_like_table_sql(self, new_table: str, source_table: str) -> str: + return f"CREATE TABLE {new_table} LIKE {source_table};" + + def create_schema_sql(self, schema: str) -> str: + # DB2 for i: CREATE SCHEMA IF NOT EXISTS is available 7.4+; use the + # catalog check for broader compatibility. + name = schema.replace("'", "''") + q = self.quote_identifier(schema) + return ( + f"BEGIN\n" + f" IF NOT EXISTS (SELECT 1 FROM QSYS2.SYSSCHEMAS WHERE SCHEMA_NAME='{name}')\n" + f" THEN CREATE SCHEMA {q};\n" + f" END IF;\n" + f"END" + ) + + def build_create_table_sql(self, qualified_table: str, + columns: list[dict]) -> str: + if not columns: + raise ValueError("no columns provided for CREATE TABLE") + lines = [] + for c in columns: + name = c["dest_name"] + validate_identifier(name, "dest column name") + dtype = _normalize_db2_type((c.get("dest_type") or "").strip()) + if not dtype: + raise ValueError(f"column {name!r} has no dest_type") + lines.append(f" {self.quote_identifier(name)} {dtype}") + body = ",\n".join(lines) + return f"CREATE TABLE {qualified_table} (\n{body}\n);" + + def check_dest_table(self, conn: dict, schema: str, + table: str) -> "set[str] | None": + sql = ( + f"SELECT COLUMN_NAME FROM QSYS2.SYSCOLUMNS " + f"WHERE TABLE_SCHEMA='{schema}' AND TABLE_NAME='{table}'" + ) + r = self.query(conn, sql) + if not r.rows: + return None + return {row[0].strip().lower() for row in r.rows if row and row[0]} + def map_type(self, type_raw: str) -> str: base = _base(type_raw) mapped = _TYPE_MAP.get(base, "text") @@ -141,6 +203,33 @@ class DB2Driver(Driver): return mapped +_PG_TO_DB2: dict[str, str] = { + "text": "VARCHAR(32000)", "varchar": "VARCHAR(32000)", + "integer": "INTEGER", "int": "INTEGER", + "bigint": "BIGINT", "smallint": "SMALLINT", + "numeric": "NUMERIC", "decimal": "DECIMAL", + "real": "REAL", "double precision": "DOUBLE", + "boolean": "SMALLINT", "bool": "SMALLINT", + "date": "DATE", + "timestamp": "TIMESTAMP", "timestamptz": "TIMESTAMP", + "time": "TIME", + "bytea": "BLOB", + "uuid": "CHAR(36)", + "json": "CLOB", "jsonb": "CLOB", +} + + +def _normalize_db2_type(dest_type: str) -> str: + """Map PG-style type names to DB2 for i equivalents; pass native types through.""" + base = dest_type.lower().split("(")[0].strip() + native = _PG_TO_DB2.get(base) + if native: + if "(" in dest_type and base in ("numeric", "decimal"): + return base.upper() + dest_type[dest_type.index("("):] + return native + return dest_type or "VARCHAR(32000)" + + def _format_type(dtype: str, length: str, prec: str, scale: str) -> str: base = dtype.upper() if base in ("DECIMAL", "NUMERIC") and prec: diff --git a/pipekit/drivers/mssql.py b/pipekit/drivers/mssql.py index 65a4584..8b8c729 100644 --- a/pipekit/drivers/mssql.py +++ b/pipekit/drivers/mssql.py @@ -216,6 +216,31 @@ class MSSQLDriver(Driver): return "numeric" + type_raw[type_raw.index("("):] return mapped + supports_table_comments = False # MSSQL uses sp_addextendedproperty, not COMMENT ON + + def create_schema_sql(self, schema: str) -> str: + name = schema.replace("'", "''") + q = self.quote_identifier(schema) + return f"IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name='{name}') EXEC('CREATE SCHEMA {q}');" + + def create_like_table_sql(self, new_table: str, source_table: str) -> str: + return f"SELECT TOP 0 * INTO {new_table} FROM {source_table};" + + def build_create_table_sql(self, qualified_table: str, + columns: list[dict]) -> str: + if not columns: + raise ValueError("no columns provided for CREATE TABLE") + lines = [] + for c in columns: + name = c["dest_name"] + validate_identifier(name, "dest column name") + dtype = _normalize_mssql_type((c.get("dest_type") or "").strip()) + if not dtype: + raise ValueError(f"column {name!r} has no dest_type") + lines.append(f" {self.quote_identifier(name)} {dtype}") + body = ",\n".join(lines) + return f"CREATE TABLE {qualified_table} (\n{body}\n);" + # ---- helpers ---- def _validate(self, linked_server, database, schema): if linked_server: @@ -233,6 +258,33 @@ class MSSQLDriver(Driver): return "" +_PG_TO_MSSQL: dict[str, str] = { + "text": "NVARCHAR(MAX)", "varchar": "NVARCHAR(MAX)", + "integer": "INT", "int": "INT", + "bigint": "BIGINT", "smallint": "SMALLINT", + "numeric": "NUMERIC", "decimal": "DECIMAL", + "real": "REAL", "double precision": "FLOAT", + "boolean": "BIT", "bool": "BIT", + "date": "DATE", + "timestamp": "DATETIME2", "timestamptz": "DATETIMEOFFSET", + "time": "TIME", + "bytea": "VARBINARY(MAX)", + "uuid": "UNIQUEIDENTIFIER", + "json": "NVARCHAR(MAX)", "jsonb": "NVARCHAR(MAX)", +} + + +def _normalize_mssql_type(dest_type: str) -> str: + """Map PG-style type names to MSSQL equivalents; pass native types through.""" + base = dest_type.lower().split("(")[0].strip() + native = _PG_TO_MSSQL.get(base) + if native: + if "(" in dest_type and base in ("numeric", "decimal"): + return base.upper() + dest_type[dest_type.index("("):] + return native + return dest_type or "NVARCHAR(MAX)" + + def _format_type(dtype: str, length: str, prec: str, scale: str) -> str: base = dtype.upper() if base in ("DECIMAL", "NUMERIC") and prec: diff --git a/pipekit/engine/runner.py b/pipekit/engine/runner.py index 85c20f2..71cefea 100644 --- a/pipekit/engine/runner.py +++ b/pipekit/engine/runner.py @@ -19,7 +19,7 @@ import os import traceback from dataclasses import dataclass -from .. import jrunner, repo +from .. import drivers, jrunner, repo from . import merge, watermark @@ -77,6 +77,8 @@ def run_module(module_id: int, *, group_run_id: int | None = None, dest_conn = repo.get_connection(module["dest_connection_id"]) if source_conn is None or dest_conn is None: raise ValueError("source or dest connection missing") + dest_drv_row = repo.get_driver_row(dest_conn["driver_id"]) + dest_drv = drivers.get_driver(dest_drv_row["kind"]) if dest_drv_row else None # 2–3. watermarks + materialised source query wm_values = watermark.resolve_watermarks(module, use_defaults_only=False) @@ -103,16 +105,17 @@ def run_module(module_id: int, *, group_run_id: int | None = None, # self-healing. Staging is ephemeral per SPEC; nothing of value lives # between runs. staging_schema, _, _ = module["staging_table"].partition(".") - if staging_schema and staging_schema != module["staging_table"]: + if staging_schema and staging_schema != module["staging_table"] and dest_drv: jrunner.run_dest_sql( - dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};") - jrunner.run_dest_sql( - dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") - jrunner.run_dest_sql( - dest_conn, - f"CREATE TABLE {module['staging_table']} " - f"(LIKE {module['dest_table']} INCLUDING ALL);", - ) + dest_conn, dest_drv.create_schema_sql(staging_schema)) + drop_sql = (dest_drv.drop_table_if_exists_sql(module["staging_table"]) + if dest_drv else f"DROP TABLE IF EXISTS {module['staging_table']};") + like_sql = (dest_drv.create_like_table_sql(module["staging_table"], module["dest_table"]) + if dest_drv else + f"CREATE TABLE {module['staging_table']} " + f"(LIKE {module['dest_table']} INCLUDING ALL);") + jrunner.run_dest_sql(dest_conn, drop_sql) + jrunner.run_dest_sql(dest_conn, like_sql) # 5. migrate source → staging. jrunner does its own `DELETE FROM staging` # before loading, so we don't need a separate TRUNCATE. diff --git a/pipekit/web/app.py b/pipekit/web/app.py index c2fe9fe..fd95558 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -259,10 +259,15 @@ async def module_update(request: Request, module_id: int): # fresh from dest on next run). Re-apply dest COMMENT if it changed. dest_conn = repo.get_connection(module["dest_connection_id"]) if dest_conn is not None: + dest_drv_row = repo.get_driver_row(dest_conn["driver_id"]) + _dest_drv = (drivers.get_driver(dest_drv_row["kind"]) + if dest_drv_row else None) try: - jrunner.run_dest_sql( - dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};") - if new_description != module["dest_description"]: + drop_sql = (_dest_drv.drop_table_if_exists_sql(module["staging_table"]) + if _dest_drv else f"DROP TABLE IF EXISTS {module['staging_table']};") + jrunner.run_dest_sql(dest_conn, drop_sql) + if new_description != module["dest_description"] and ( + _dest_drv is None or _dest_drv.supports_table_comments): jrunner.run_dest_sql( dest_conn, f"COMMENT ON TABLE {module['dest_table']} IS " @@ -494,7 +499,7 @@ def wizard_step3(request: Request, drivers_by_id = {d["id"]: d for d in repo.list_drivers()} dest_conns = [ c for c in repo.list_connections() - if drivers_by_id.get(c["driver_id"], {}).get("kind") == "pg" + if drivers_by_id.get(c["driver_id"]) is not None ] qualified = drv.qualified_table_name(table, **qvals) if not fetch_error else table default_module_name = (table_schema + "_" + table).lower() if table_schema else table.lower() @@ -507,16 +512,20 @@ def wizard_step3(request: Request, if not fetch_error and default_dest_conn_id and default_dest_schema: dest_conn_row = repo.get_connection(default_dest_conn_id) if dest_conn_row is not None: - try: - existing = _existing_dest_columns( - dest_conn_row, default_dest_schema, default_module_name) - except jrunner.JrunnerError: - existing = None - if existing is not None: - dest_warn = { - "qualified": f"{default_dest_schema}.{default_module_name}", - "columns": sorted(existing), - } + dest_drv_row = repo.get_driver_row(dest_conn_row["driver_id"]) + default_dest_drv = (drivers.get_driver(dest_drv_row["kind"]) + if dest_drv_row else None) + if default_dest_drv is not None: + try: + existing = default_dest_drv.check_dest_table( + dest_conn_row, default_dest_schema, default_module_name) + except jrunner.JrunnerError: + existing = None + if existing is not None: + dest_warn = { + "qualified": f"{default_dest_schema}.{default_module_name}", + "columns": sorted(existing), + } return _templates.TemplateResponse( request, @@ -619,8 +628,7 @@ async def wizard_create(request: Request): # If the dest table already exists, don't clobber it. Verify the picks # match its shape and skip CREATE/COMMENT. try: - existing_cols = _existing_dest_columns( - dest_conn, dest_schema, dest_table_bare) + existing_cols = dest_drv.check_dest_table(dest_conn, dest_schema, dest_table_bare) except jrunner.JrunnerError as e: raise HTTPException(500, f"could not introspect dest: {e}") dest_exists = existing_cols is not None @@ -647,29 +655,19 @@ async def wizard_create(request: Request): ) try: - jrunner.run_dest_sql( - dest_conn, - f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};", - ) + jrunner.run_dest_sql(dest_conn, dest_drv.create_schema_sql(dest_schema)) if not dest_exists: jrunner.run_dest_sql(dest_conn, create_table_sql) - comment_sql = _build_comment_sql(dest_drv, qualified_dest, - dest_description, chosen) - if comment_sql: - jrunner.run_dest_sql(dest_conn, comment_sql) + if dest_drv.supports_table_comments: + comment_sql = _build_comment_sql(dest_drv, qualified_dest, + dest_description, chosen) + if comment_sql: + jrunner.run_dest_sql(dest_conn, comment_sql) # Pre-align staging to dest so first run doesn't surprise us. if staging_schema and staging_schema != effective_staging: - jrunner.run_dest_sql( - dest_conn, - f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};", - ) - jrunner.run_dest_sql( - dest_conn, f"DROP TABLE IF EXISTS {effective_staging};") - jrunner.run_dest_sql( - dest_conn, - f"CREATE TABLE {effective_staging} " - f"(LIKE {qualified_dest} INCLUDING ALL);", - ) + jrunner.run_dest_sql(dest_conn, dest_drv.create_schema_sql(staging_schema)) + jrunner.run_dest_sql(dest_conn, dest_drv.drop_table_if_exists_sql(effective_staging)) + jrunner.run_dest_sql(dest_conn, dest_drv.create_like_table_sql(effective_staging, qualified_dest)) except jrunner.JrunnerError as e: raise HTTPException(500, f"dest provisioning failed: {e}") @@ -759,20 +757,6 @@ def _sql_str(v: str) -> str: return "'" + v.replace("'", "''") + "'" -def _existing_dest_columns(dest_conn: dict, schema: str, - table: str) -> set[str] | None: - """Return lowercase column names of an existing PG dest table, or - None if it doesn't exist. PG-only; fine while pg is the sole dest.""" - r = jrunner.run_dest_sql( - dest_conn, - f"SELECT column_name FROM information_schema.columns " - f"WHERE table_schema={_sql_str(schema)} " - f"AND table_name={_sql_str(table)}", - ) - if not r.rows: - return None - return {row[0].strip().lower() for row in r.rows if row and row[0]} - def _build_comment_sql(dest_drv, qualified_dest: str, table_description: str | None, diff --git a/pipekit/web/templates/group_form.html b/pipekit/web/templates/group_form.html index 3cd1b87..727deb3 100644 --- a/pipekit/web/templates/group_form.html +++ b/pipekit/web/templates/group_form.html @@ -66,11 +66,31 @@
0 4 * * * for daily at 04:00
+ cron expression reference
| pos 1 | +pos 2 | +pos 3 | +pos 4 | +pos 5 | +
|---|---|---|---|---|
| minute 0–59 |
+ hour 0–23 |
+ day of month 1–31 |
+ month 1–12 |
+ day of week 0=Sun, 6=Sat |
+
| pos 1 | +pos 2 | +pos 3 | +pos 4 | +pos 5 | +
|---|---|---|---|---|
| minute 0–59 |
+ hour 0–23 |
+ day of month 1–31 |
+ month 1–12 |
+ day of week 0=Sun, 6=Sat |
+
| expression | meaning |
|---|
minute hour day-of-month month day-of-week · all times local
+ all times local · use * for "any", */n for "every n", a-b for range, a,b for list