diff --git a/pipekit/web/app.py b/pipekit/web/app.py index e6a4905..c852b4b 100644 --- a/pipekit/web/app.py +++ b/pipekit/web/app.py @@ -436,16 +436,36 @@ async def wizard_create(request: Request): effective_staging = staging_table or f"pipekit_staging.{module_name}" staging_schema, _, _ = effective_staging.partition(".") + # If the dest table already exists, don't clobber it. Verify the picks + # match its shape and skip CREATE/COMMENT. + try: + existing_cols = _existing_dest_columns( + dest_conn, dest_schema, dest_table_bare) + except jrunner.JrunnerError as e: + raise HTTPException(500, f"could not introspect dest: {e}") + dest_exists = existing_cols is not None + if dest_exists: + missing = [c["dest_name"] for c in chosen + if c["dest_name"].lower() not in existing_cols] + if missing: + raise HTTPException( + 400, + f"dest table {qualified_dest} already exists but is missing " + f"columns: {', '.join(missing)}. Drop the table, choose a " + f"different dest_table, or align your column picks to match " + f"the existing schema.") + try: jrunner.run_dest_sql( dest_conn, f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};", ) - jrunner.run_dest_sql(dest_conn, create_table_sql) - comment_sql = _build_comment_sql(dest_drv, qualified_dest, - dest_description, chosen) - if comment_sql: - jrunner.run_dest_sql(dest_conn, comment_sql) + if not dest_exists: + jrunner.run_dest_sql(dest_conn, create_table_sql) + comment_sql = _build_comment_sql(dest_drv, qualified_dest, + dest_description, chosen) + if comment_sql: + jrunner.run_dest_sql(dest_conn, comment_sql) # Pre-align staging to dest so first run doesn't surprise us. if staging_schema and staging_schema != effective_staging: jrunner.run_dest_sql( @@ -482,6 +502,21 @@ def _sql_str(v: str) -> str: return "'" + v.replace("'", "''") + "'" +def _existing_dest_columns(dest_conn: dict, schema: str, + table: str) -> set[str] | None: + """Return lowercase column names of an existing PG dest table, or + None if it doesn't exist. PG-only; fine while pg is the sole dest.""" + r = jrunner.run_dest_sql( + dest_conn, + f"SELECT column_name FROM information_schema.columns " + f"WHERE table_schema={_sql_str(schema)} " + f"AND table_name={_sql_str(table)}", + ) + if not r.rows: + return None + return {row[0].strip().lower() for row in r.rows if row and row[0]} + + def _build_comment_sql(dest_drv, qualified_dest: str, table_description: str | None, columns: list[dict]) -> str: