Compare commits

..

No commits in common. "f18ea55a12f67cfe3ab0089458eff1480bc2630a" and "d952b48a4e093391c9935fdbe30a5231d8490216" have entirely different histories.

17 changed files with 16 additions and 773 deletions

3
.gitignore vendored
View File

@ -10,6 +10,3 @@ pipekit.db-shm
# Local Claude Code settings.
.claude/settings.local.json
# Python venv created by deploy.sh.
.venv/

View File

@ -1,9 +1,4 @@
#!/usr/bin/env bash
# Thin launcher: prefer the repo venv if deploy.sh created one,
# else fall back to system python3.
# Thin launcher: run `pipekit` from anywhere.
set -euo pipefail
REPO="$(cd "$(dirname "$(readlink -f "$0")")/.." && pwd)"
if [ -x "$REPO/.venv/bin/python3" ]; then
exec "$REPO/.venv/bin/python3" -m pipekit "$@"
fi
exec python3 -m pipekit "$@"

View File

@ -1,8 +1,7 @@
database: /opt/pipekit/pipekit.db
jrunner_path: /usr/local/bin/jrunner
driver_dir: /opt/pipekit/drivers/
api_host: 0.0.0.0
api_port: 8200
api_port: 8100
# smtp:
# host: smtp.example.com
# port: 587

View File

@ -1,97 +0,0 @@
#!/usr/bin/env bash
# Pipekit deployment — idempotent. Re-run any time.
#
# Steps:
# 1. Check prerequisites (python3, jrunner on PATH)
# 2. Create Python venv at $REPO/.venv and install requirements
# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python)
# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body)
# 5. Run `pipekit init` to create/upgrade the SQLite schema
# 6. Register driver rows for every JDBC jar shipped with jrunner
#
# After running:
# - Set DB passwords with: sudo pipekit secrets set <KEY>
# - See systemd/pipekit.service for a unit file template
set -euo pipefail
REPO_DIR="${PIPEKIT_REPO:-$(cd "$(dirname "$0")" && pwd)}"
VENV_DIR="$REPO_DIR/.venv"
LAUNCHER="/usr/local/bin/pipekit"
CONFIG_DIR="/etc/pipekit"
SECRETS_FILE="$CONFIG_DIR/secrets.env"
if [ "$EUID" -ne 0 ]; then
exec sudo -H -E "$0" "$@"
fi
echo "== pipekit deploy =="
echo "repo: $REPO_DIR"
echo "venv: $VENV_DIR"
echo "secrets: $SECRETS_FILE"
echo ""
command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; }
command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; }
if [ ! -d "$VENV_DIR" ]; then
echo "Creating venv at $VENV_DIR"
python3 -m venv "$VENV_DIR"
fi
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
echo "Python deps installed."
# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed.
chmod +x "$REPO_DIR/bin/pipekit"
ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER"
echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit"
install -d -m 0755 "$CONFIG_DIR"
if [ ! -f "$SECRETS_FILE" ]; then
install -m 0600 /dev/null "$SECRETS_FILE"
cat > "$SECRETS_FILE" <<'EOF'
# pipekit secrets — sourced by the service process (EnvironmentFile=)
# or by the shell before `pipekit serve`. One KEY=VALUE per line.
# Connection rows reference these as $KEY (e.g. password: "$DB2PW").
#
# This file must stay mode 0600 and out of version control.
# Use `sudo pipekit secrets set <KEY>` to add entries safely.
EOF
chmod 0600 "$SECRETS_FILE"
echo "Created $SECRETS_FILE"
else
echo "Keeping existing $SECRETS_FILE"
fi
"$LAUNCHER" init
# Register drivers for each JDBC jar jrunner ships with.
JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib"
register_jar() {
local kind="$1" pattern="$2"
local jar
jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)"
if [ -n "$jar" ]; then
"$LAUNCHER" drivers register "$kind" --jar "$jar"
else
echo " (no $pattern in $JR_LIB — skipping $kind)"
fi
}
register_jar db2 "jt400-*.jar"
register_jar pg "postgresql-*.jar"
register_jar mssql "mssql-jdbc-*.jar"
echo ""
echo "pipekit deployed."
echo ""
echo "Next steps:"
echo " 1. Set passwords: sudo pipekit secrets set DB2PW"
echo " sudo pipekit secrets set PGPW"
echo " 2. Start the server manually:"
echo " set -a; source $SECRETS_FILE; set +a"
echo " pipekit serve"
echo " 3. Or install the systemd unit:"
echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/"
echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now pipekit"

View File

@ -45,48 +45,6 @@ def cmd_drivers_list(args) -> int:
return 0
_DEFAULT_JDBC_CLASSES = {
"db2": "com.ibm.as400.access.AS400JDBCDriver",
"pg": "org.postgresql.Driver",
"mssql": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
def cmd_drivers_register(args) -> int:
try:
drivers.get_driver(args.kind)
except ValueError as e:
print(f"error: {e}")
return 1
existing = [d for d in repo.list_drivers() if d["kind"] == args.kind]
if existing and not args.force:
print(f"driver kind {args.kind!r} already registered as "
f"{existing[0]['name']!r} (id={existing[0]['id']}). "
f"Use --force to add a second row.")
return 0
class_name = args.class_name or _DEFAULT_JDBC_CLASSES.get(args.kind)
if not class_name:
print(f"error: no built-in JDBC class for kind {args.kind!r}; "
f"pass --class explicitly")
return 1
import os
if not os.path.exists(args.jar):
print(f"warning: jar {args.jar!r} does not exist "
f"(registering anyway)")
name = args.name or f"{args.kind}-jdbc"
row = repo.create_driver(
name=name, kind=args.kind, jar_file=args.jar,
class_name=class_name, url_template=args.url_template,
)
print(f"registered driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
return 0
def cmd_drivers_show(args) -> int:
try:
d = drivers.get_driver(args.kind)
@ -137,10 +95,8 @@ def cmd_serve(args) -> int:
import uvicorn
from .api import create_app
cfg = get_config()
host = args.host or cfg.api_host
port = args.port or cfg.api_port
uvicorn.run(create_app(), host=host, port=port, reload=args.reload)
port = args.port or get_config().api_port
uvicorn.run(create_app(), host=args.host, port=port, reload=args.reload)
return 0
@ -157,101 +113,6 @@ def cmd_set_password(args) -> int:
return 0
_DEFAULT_SECRETS_FILE = "/etc/pipekit/secrets.env"
def _secrets_path(args) -> str:
import os
return (getattr(args, "file", None)
or os.environ.get("PIPEKIT_SECRETS")
or _DEFAULT_SECRETS_FILE)
def cmd_secrets_list(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 0
keys: list[str] = []
with open(path) as f:
for line in f:
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
keys.append(s.split("=", 1)[0])
print(f"{path}{len(keys)} secret(s):")
for k in keys:
print(f" {k}")
return 0
def cmd_secrets_set(args) -> int:
import getpass
import os
import stat
path = _secrets_path(args)
value = args.value if args.value is not None else getpass.getpass(
f"value for {args.key}: ")
if not value:
print("error: empty value")
return 1
lines: list[str] = []
replaced = False
if os.path.exists(path):
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
lines.append(f"{args.key}={value}\n")
replaced = True
else:
lines.append(line)
if not replaced:
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(f"{args.key}={value}\n")
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(lines)
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600
os.replace(tmp, path)
print(f"{'updated' if replaced else 'added'} {args.key} in {path}")
return 0
def cmd_secrets_unset(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 1
kept: list[str] = []
removed = False
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
removed = True
continue
kept.append(line)
if not removed:
print(f"{args.key} not set in {path}")
return 0
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(kept)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(f"removed {args.key} from {path}")
return 0
def _report(checks) -> int:
width = max(len(name) for name, _, _ in checks)
failures = 0
@ -289,21 +150,6 @@ def main(argv: list[str] | None = None) -> int:
p_drv_show.add_argument("kind", help="one of the kinds from `pipekit drivers list`")
p_drv_show.set_defaults(func=cmd_drivers_show)
p_drv_reg = drv_sub.add_parser(
"register", help="add a driver row to the database")
p_drv_reg.add_argument("kind", help="driver kind (db2, pg, mssql)")
p_drv_reg.add_argument("--jar", required=True,
help="absolute path to the JDBC jar")
p_drv_reg.add_argument("--name",
help="registry name (default: <kind>-jdbc)")
p_drv_reg.add_argument("--class", dest="class_name",
help="JDBC Driver class (default: built-in per kind)")
p_drv_reg.add_argument("--url-template",
help="optional JDBC URL template for the wizard")
p_drv_reg.add_argument("--force", action="store_true",
help="register even if a row for this kind exists")
p_drv_reg.set_defaults(func=cmd_drivers_register)
p_run = sub.add_parser("run", help="run a module by name (synchronous)")
p_run.add_argument("module", help="module name")
p_run.add_argument("--dry-run", action="store_true",
@ -311,8 +157,7 @@ def main(argv: list[str] | None = None) -> int:
p_run.set_defaults(func=cmd_run)
p_serve = sub.add_parser("serve", help="start the HTTP API")
p_serve.add_argument("--host", default=None,
help="defaults to config.yaml api_host (127.0.0.1)")
p_serve.add_argument("--host", default="127.0.0.1")
p_serve.add_argument("--port", type=int, default=None,
help="defaults to config.yaml api_port")
p_serve.add_argument("--reload", action="store_true")
@ -322,27 +167,6 @@ def main(argv: list[str] | None = None) -> int:
p_pw.add_argument("username")
p_pw.set_defaults(func=cmd_set_password)
p_sec = sub.add_parser(
"secrets",
help=f"manage the env-var file for DB passwords (default {_DEFAULT_SECRETS_FILE})")
sec_sub = p_sec.add_subparsers(dest="sec_cmd", required=True)
p_sec_list = sec_sub.add_parser("list", help="list keys in the secrets file")
p_sec_list.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_list.set_defaults(func=cmd_secrets_list)
p_sec_set = sec_sub.add_parser("set", help="add or update a KEY (prompted if value omitted)")
p_sec_set.add_argument("key")
p_sec_set.add_argument("value", nargs="?", default=None,
help="value; omit to be prompted (safer)")
p_sec_set.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_set.set_defaults(func=cmd_secrets_set)
p_sec_unset = sec_sub.add_parser("unset", help="remove a KEY from the secrets file")
p_sec_unset.add_argument("key")
p_sec_unset.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_unset.set_defaults(func=cmd_secrets_unset)
args = p.parse_args(argv)
return args.func(args)

View File

@ -28,10 +28,6 @@ class Config:
def api_port(self) -> int:
return int(self._data.get("api_port", 8100))
@property
def api_host(self) -> str:
return str(self._data.get("api_host", "127.0.0.1"))
def get(self, key: str, default=None):
return self._data.get(key, default)

View File

@ -91,19 +91,16 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
status = "success"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so
# any drift — dest columns added since staging was last made — is
# self-healing. Staging is ephemeral per SPEC; nothing of value lives
# between runs.
# 4. ensure staging table exists on dest. Mirror the real dest schema
# so jrunner's auto-DELETE and the subsequent merge INSERT both find
# a table to work on. Idempotent — no-op after first run.
staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]:
jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE {module['staging_table']} "
f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);",
)
@ -131,9 +128,6 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
except Exception as e: # noqa: BLE001
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
if isinstance(e, jrunner.JrunnerError):
repo.log_run_output(run_id, jrunner_stdout=e.stdout,
jrunner_stderr=e.stderr)
# Failure-path hooks, if any. Never let these mask the real error.
try:
hook_log = _run_hooks(module_id, fail_fast=False,

View File

@ -125,9 +125,6 @@ def query(
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
reader = csv.reader(io.StringIO(r.stdout))
header = next(reader, [])
@ -172,9 +169,6 @@ def migrate(
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
return MigrateResult(
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
@ -208,35 +202,6 @@ def _parse_row_count(text: str) -> int | None:
return None
# jrunner catches SQLException, prints the stack trace, then exits 0 at
# nearly every failure site (see jrunner.java). Detect those by scanning
# for a Java stack-trace signature so callers don't treat silent failures
# as success.
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
_EXCEPTION_HEADER_RE = re.compile(
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
# jrunner runs query-mode SQL with `executeQuery`, which requires the
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
# still executes, but PG then throws "No results were returned by the
# query." The statement succeeded — ignore the trace.
_BENIGN_EXCEPTION_SUBSTRINGS = (
"No results were returned by the query",
)
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
"""Return a short error summary if jrunner exited 0 but logged a failure."""
combined = (stderr or "") + "\n" + (stdout or "")
if not _STACK_FRAME_RE.search(combined):
return None
m = _EXCEPTION_HEADER_RE.search(combined)
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
return None
return header
class JrunnerError(RuntimeError):
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
super().__init__(message)

View File

@ -177,40 +177,6 @@ def list_modules() -> list[dict]:
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
def update_module(module_id: int, *, name: str | None = None,
source_connection_id: int | None = None,
dest_connection_id: int | None = None,
dest_table: str | None = None,
staging_table: str | None = None,
source_query: str | None = None,
merge_strategy: str | None = None,
merge_key: str | None = None,
dest_description: str | None = None,
enabled: int | None = None) -> dict | None:
fields: list[str] = []
values: list = []
for col, val in (("name", name),
("source_connection_id", source_connection_id),
("dest_connection_id", dest_connection_id),
("dest_table", dest_table),
("staging_table", staging_table),
("source_query", source_query),
("merge_strategy", merge_strategy),
("merge_key", merge_key),
("dest_description", dest_description),
("enabled", enabled)):
if val is not None:
fields.append(f"{col}=?")
values.append(val)
if not fields:
return get_module(module_id)
fields.append("updated_at=datetime('now')")
values.append(module_id)
with db.connect() as c:
c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values)
return get_module(module_id)
class ModuleRunning(RuntimeError):
"""Raised by delete_module when the module is currently running."""

View File

@ -13,7 +13,6 @@ wizard, editors, and SSE-driven live run watch come next.
from __future__ import annotations
from pathlib import Path
from urllib.parse import urlencode
from fastapi import APIRouter, FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@ -132,72 +131,6 @@ def module_detail(request: Request, module_id: int):
)
@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse)
def module_edit(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
return _templates.TemplateResponse(
request,
"module_form.html",
_ctx(module=module, connections=repo.list_connections(),
form_action=f"/modules/{module_id}",
cancel_url=f"/modules/{module_id}"),
)
@_router.post("/modules/{module_id}")
async def module_update(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
form = await request.form()
new_name = form["name"].strip()
if new_name != module["name"]:
existing = repo.get_module_by_name(new_name)
if existing is not None:
raise HTTPException(
409, f"module name {new_name!r} already exists — pick another")
merge_strategy = form.get("merge_strategy", "full")
if merge_strategy not in ("full", "incremental", "append"):
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
new_staging = form["staging_table"].strip()
new_description = (form.get("dest_description") or "").strip() or None
# DDL snap: drop the staging table under its old name (runner recreates
# fresh from dest on next run). Re-apply dest COMMENT if it changed.
dest_conn = repo.get_connection(module["dest_connection_id"])
if dest_conn is not None:
try:
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
if new_description != module["dest_description"]:
jrunner.run_dest_sql(
dest_conn,
f"COMMENT ON TABLE {module['dest_table']} IS "
f"{_sql_str(new_description or '')};",
)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest DDL snap failed: {e}")
repo.update_module(
module_id,
name=new_name,
source_connection_id=int(form["source_connection_id"]),
dest_connection_id=int(form["dest_connection_id"]),
dest_table=form["dest_table"].strip(),
staging_table=new_staging,
source_query=form["source_query"],
merge_strategy=merge_strategy,
merge_key=(form.get("merge_key") or "").strip() or None,
dest_description=new_description,
enabled=1 if form.get("enabled") == "1" else 0,
)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@_router.post("/modules/{module_id}/delete")
def module_delete(module_id: int):
if repo.get_module(module_id) is None:
@ -341,23 +274,6 @@ def wizard_step3(request: Request,
default_dest_conn_id = conn.get("default_dest_connection_id")
default_dest_schema = conn.get("default_dest_schema") or ""
# Proactive warning — if the default dest table already exists, surface
# its columns now so the user can align picks before submit.
dest_warn: dict | None = None
if not fetch_error and default_dest_conn_id and default_dest_schema:
dest_conn_row = repo.get_connection(default_dest_conn_id)
if dest_conn_row is not None:
try:
existing = _existing_dest_columns(
dest_conn_row, default_dest_schema, default_module_name)
except jrunner.JrunnerError:
existing = None
if existing is not None:
dest_warn = {
"qualified": f"{default_dest_schema}.{default_module_name}",
"columns": sorted(existing),
}
return _templates.TemplateResponse(
request,
"wizard_step3.html",
@ -367,8 +283,7 @@ def wizard_step3(request: Request,
table_description=table_description,
fetch_error=fetch_error, default_module_name=default_module_name,
default_dest_conn_id=default_dest_conn_id,
default_dest_schema=default_dest_schema,
dest_warn=dest_warn),
default_dest_schema=default_dest_schema),
)
@ -452,63 +367,16 @@ async def wizard_create(request: Request):
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
except NotImplementedError as e:
raise HTTPException(400, str(e))
effective_staging = staging_table or f"pipekit_staging.{module_name}"
staging_schema, _, _ = effective_staging.partition(".")
# If the dest table already exists, don't clobber it. Verify the picks
# match its shape and skip CREATE/COMMENT.
try:
existing_cols = _existing_dest_columns(
dest_conn, dest_schema, dest_table_bare)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"could not introspect dest: {e}")
dest_exists = existing_cols is not None
if dest_exists:
missing = [c["dest_name"] for c in chosen
if c["dest_name"].lower() not in existing_cols]
if missing:
back_qs = urlencode(
[("source_connection_id", source_connection_id),
("table", table),
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
*qvals.items()])
return _templates.TemplateResponse(
request,
"wizard_error.html",
_ctx(
title="Dest table column mismatch",
qualified_dest=qualified_dest,
missing=missing,
existing=sorted(existing_cols),
back_qs=back_qs,
),
status_code=409,
)
try:
jrunner.run_dest_sql(
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};",
)
if not dest_exists:
jrunner.run_dest_sql(dest_conn, create_table_sql)
comment_sql = _build_comment_sql(dest_drv, qualified_dest,
dest_description, chosen)
if comment_sql:
jrunner.run_dest_sql(dest_conn, comment_sql)
# Pre-align staging to dest so first run doesn't surprise us.
if staging_schema and staging_schema != effective_staging:
jrunner.run_dest_sql(
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
)
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE {effective_staging} "
f"(LIKE {qualified_dest} INCLUDING ALL);",
)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest provisioning failed: {e}")
@ -532,21 +400,6 @@ def _sql_str(v: str) -> str:
return "'" + v.replace("'", "''") + "'"
def _existing_dest_columns(dest_conn: dict, schema: str,
table: str) -> set[str] | None:
"""Return lowercase column names of an existing PG dest table, or
None if it doesn't exist. PG-only; fine while pg is the sole dest."""
r = jrunner.run_dest_sql(
dest_conn,
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema={_sql_str(schema)} "
f"AND table_name={_sql_str(table)}",
)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
def _build_comment_sql(dest_drv, qualified_dest: str,
table_description: str | None,
columns: list[dict]) -> str:

View File

@ -277,4 +277,3 @@ table.picker tbody tr:hover td { background: #1c2128; }
}
.flash.ok { border-color: #2f6b35; background: #16261a; color: #b6dcb8; }
.flash.err { border-color: #6b2f2f; background: #261616; color: #dcb6b6; }
.flash.warn { border-color: #6b5a2f; background: #261f16; color: #e1c98a; }

View File

@ -19,7 +19,6 @@
<input type="hidden" name="dry_run" value="1">
<button type="submit">Dry run</button>
</form>
<a class="btn" href="/modules/{{ module.id }}/edit">Edit</a>
<form class="inline" method="post" action="/modules/{{ module.id }}/delete"
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
@ -42,8 +41,7 @@
<div>
<div class="panel">
<header>Source query
<span class="subtitle">free text with <code>{watermark}</code> placeholders</span>
<span style="margin-left:auto"><a href="/modules/{{ module.id }}/edit">edit</a></span>
<span class="subtitle">free text — edit opens in $EDITOR (TODO)</span>
</header>
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
</div>

View File

@ -1,115 +0,0 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}Edit module &middot; {{ module.name }} — Pipekit{% endblock %}
{% block content %}
<div class="panel">
<header>
Edit module &middot; {{ module.name }}
<span class="subtitle">
module #{{ module.id }}
{% if module.running %}<span class="pill running">running</span>{% endif %}
</span>
<span style="margin-left:auto"><a href="{{ cancel_url }}">&larr; back to module</a></span>
</header>
<div class="body">
{% if module.running %}
<div class="flash warning" style="margin-bottom:0.8rem">
This module is currently running. Saving changes now is allowed but may affect the in-flight run.
</div>
{% endif %}
<form method="post" action="{{ form_action }}">
<label class="field">
<span>name</span>
<input type="text" name="name" required value="{{ module.name }}">
<span class="help">must be unique; also used as the default staging table suffix</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>source connection</span>
<select name="source_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.source_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where <code>source_query</code> runs</span>
</label>
<label class="field">
<span>dest connection</span>
<select name="dest_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.dest_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where staging + merge run</span>
</label>
</div>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>dest table</span>
<input type="text" name="dest_table" required value="{{ module.dest_table }}">
<span class="help"><code>schema.table</code> — editing here does NOT rename the actual table</span>
</label>
<label class="field">
<span>staging table</span>
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
<span class="help">dropped on save; recreated from dest on next run</span>
</label>
</div>
<label class="field">
<span>source query</span>
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>merge strategy</span>
<select name="merge_strategy" required>
{% for s in ("full", "incremental", "append") %}
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<span>merge key</span>
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
placeholder="id or (col_a, col_b)">
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
</label>
</div>
<label class="field">
<span>dest description</span>
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
</label>
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
<input type="checkbox" name="enabled" value="1"
{% if module.enabled %}checked{% endif %}>
<span>enabled</span>
<span class="help">disabled modules are skipped by group runs; ad-hoc runs still work</span>
</label>
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
<button type="submit" class="primary">save changes</button>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@ -1,57 +0,0 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}New module — error{% endblock %}
{% block content %}
<div class="panel">
<header>
Wizard error
<span class="subtitle">{{ title }}</span>
<span style="margin-left:auto">
<a href="/wizard/columns?{{ back_qs }}">&larr; back to step 3</a>
</span>
</header>
<div class="body">
<div class="flash err">
Dest table <code>{{ qualified_dest }}</code> already exists, but your
picks don't match its schema. Drop the table, choose a different
dest table, or align your column picks (dest names) with the
existing columns below.
</div>
<div class="two-col">
<div class="panel">
<header>
Missing from existing table
<span class="subtitle">{{ missing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for m in missing %}
<tr><td class="mono">{{ m }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
<div class="panel">
<header>
Columns in existing dest
<span class="subtitle">{{ existing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for c in existing %}
<tr><td class="mono">{{ c }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -27,45 +27,10 @@
</label>
{% endfor %}
<label class="field">
<span>table (skip browse)</span>
<input type="text" name="table" id="jump-table"
placeholder="schema.table — or leave blank to browse"
autocomplete="off" spellcheck="false">
<span class="help">if you know the table, fill this in and click jump. <code>schema.table</code> auto-populates the qualifier above on tab-out.</span>
</label>
<div class="actions" style="margin-top:0.8rem">
<button type="submit" class="primary">browse &rarr;</button>
<button type="submit" id="jump-btn"
formaction="/wizard/columns" disabled>
jump to columns &rarr;
</button>
</div>
</form>
<script>
(function () {
var tbl = document.getElementById('jump-table');
var btn = document.getElementById('jump-btn');
var schemaInput = document.querySelector('input[name="schema"]')
|| document.querySelector('input[name="library"]');
function updateBtn() {
btn.disabled = !tbl.value.trim();
}
tbl.addEventListener('input', updateBtn);
tbl.addEventListener('blur', function () {
var v = tbl.value.trim();
var dot = v.indexOf('.');
if (dot > 0 && schemaInput) {
schemaInput.value = v.substring(0, dot);
tbl.value = v.substring(dot + 1);
updateBtn();
}
});
updateBtn();
})();
</script>
</div>
</div>

View File

@ -18,16 +18,6 @@
</div>
</div>
{% if dest_warn %}
<div class="flash warn">
Dest table <code>{{ dest_warn.qualified }}</code> already exists on the
default destination. If you proceed, pipekit will <strong>not</strong> drop
or recreate it — your picks below (dest names) must match the existing
columns, or the create will fail. Existing columns:
<span class="mono">{{ dest_warn.columns|join(', ') }}</span>
</div>
{% endif %}
{% if not fetch_error %}
<form method="post" action="/wizard/create">
<input type="hidden" name="source_connection_id" value="{{ connection.id }}">

View File

@ -1,29 +0,0 @@
# Pipekit systemd unit template.
#
# Install:
# sudo cp pipekit.service /etc/systemd/system/
# sudo systemctl daemon-reload
# sudo systemctl enable --now pipekit
#
# Runs as root by default. For a dedicated service account, create the
# user and uncomment User=/Group= below:
# sudo useradd --system --home-dir /opt/pipekit --shell /usr/sbin/nologin pipekit
# sudo chown -R pipekit:pipekit /opt/pipekit/pipekit.db /etc/pipekit
[Unit]
Description=Pipekit sync engine
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
# User=pipekit
# Group=pipekit
WorkingDirectory=/opt/pipekit
EnvironmentFile=/etc/pipekit/secrets.env
ExecStart=/usr/local/bin/pipekit serve --host 0.0.0.0
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target