Compare commits

...

10 Commits

Author SHA1 Message Date
f18ea55a12 Wizard: warn in-UI when default dest table already exists.
Previously the existing-dest check fired on submit and surfaced as a raw
JSON 400. Now step 3 introspects the default dest up front and renders a
yellow banner listing existing columns; submit-time mismatches render
wizard_error.html (409) with missing vs. existing side-by-side and a back
link that re-plumbs the form qvals.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 13:27:23 -04:00
bb0b493d18 Wizard step 2: add jump-to-columns shortcut for known tables.
New text input + "jump to columns" button skip the full table listing
when you already know what you want. Typing "schema.table" and tabbing
out auto-splits into the schema qualifier + table name. Jump button
stays disabled until the table field has a value.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 00:41:00 -04:00
fde4fa99b6 Wizard: don't clobber pre-existing dest tables.
If the dest table already exists, introspect its columns and verify the
wizard's picks line up. Missing columns surface a specific error message
naming what's missing instead of the opaque "column X does not exist"
from a failed COMMENT. On match, skip CREATE + COMMENT so existing
schema and comments aren't touched; staging still gets provisioned.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 00:41:00 -04:00
4650a3cbc5 bin/pipekit auto-detects venv; stop rewriting it in deploy.sh.
The tracked launcher now checks for .venv/bin/python3 under the repo and
uses it if present, else falls back to system python3. Works pre-deploy
(no venv) and post-deploy (venv exists) without being modified. Deploy
no longer regenerates the file, so `git pull` on a deployed box won't
conflict with the launcher.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 00:34:32 -04:00
c205b48be2 Honor api_host in config.yaml; ignore .venv/ created by deploy.sh.
cmd_serve now reads api_host from Config with a 127.0.0.1 safe default,
matching the existing api_port pattern. --host/--port CLI flags still
override. Local config is bumped to bind 0.0.0.0:8200.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 00:33:56 -04:00
1c3586eb2f deploy.sh: pass -H to sudo so pip doesn't warn about user cache.
Without -H, sudo keeps HOME pointed at the invoking user, so pip running
as root tries to write to /home/<user>/.cache/pip and disables caching
with a warning. -H resets HOME to /root while -E preserves the rest.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 23:59:23 -04:00
e6a615bf70 Add deploy.sh, systemd unit template, and pipekit secrets CLI.
deploy.sh is the idempotent rollout path: venv + deps, launcher,
/etc/pipekit/secrets.env skeleton (mode 0600), schema init, and
auto-register of every JDBC driver shipped with jrunner. systemd
unit is a template, not auto-installed — user copies it when ready
to cut over.

`pipekit secrets {list,set,unset}` manages /etc/pipekit/secrets.env
with atomic 0600 writes so passwords don't need sudoedit. Prompted
input by default; positional value allowed for scripting.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 22:34:38 -04:00
e27167a4a3 Add pipekit drivers register for seeding JDBC driver rows.
Registers a driver-table row from the CLI. Kind is validated against
the code-level driver registry; JDBC class names default from a
built-in table (db2, pg, mssql). Refuses to double-register a kind
unless --force is passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 22:01:59 -04:00
01bcba78b4 Snap staging DDL on module create/edit/run; allowlist benign jrunner exception.
Staging table drift caused silent data loss when dest grew columns but
staging kept the old shape. Fix on three fronts:

- Runner now DROP+CREATEs staging each run instead of CREATE IF NOT
  EXISTS, so any drift self-heals.
- Wizard create drop+creates staging right after dest is provisioned,
  surfacing DDL errors at create time.
- Module edit drops the (old-name) staging table and re-applies
  COMMENT ON TABLE when dest_description changed.

jrunner's query mode uses executeQuery() which raises
"No results were returned by the query" after DDL/DML succeeds; the
stack-trace detector now allowlists that exception so normal
CREATE/TRUNCATE/INSERT runs aren't flagged as failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 20:10:36 -04:00
2ef68d766c Add module edit page + detect jrunner silent failures.
Modules get a full edit form (name, connections, tables, source query,
merge config, description, enabled); reachable via Edit button on the
detail page and the source-query panel.

jrunner catches SQLException and calls System.exit(0) at every failure
site, so pipekit was marking runs success when the migrate phase had
actually errored. query() and migrate() now scan stdout+stderr for a
Java stack-trace signature and raise JrunnerError. runner.py also
captures the failed jrunner output onto run_log so the stack trace is
visible on the run detail page.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 11:02:45 -04:00
17 changed files with 773 additions and 16 deletions

3
.gitignore vendored
View File

@ -10,3 +10,6 @@ pipekit.db-shm
# Local Claude Code settings.
.claude/settings.local.json
# Python venv created by deploy.sh.
.venv/

View File

@ -1,4 +1,9 @@
#!/usr/bin/env bash
# Thin launcher: run `pipekit` from anywhere.
# Thin launcher: prefer the repo venv if deploy.sh created one,
# else fall back to system python3.
set -euo pipefail
REPO="$(cd "$(dirname "$(readlink -f "$0")")/.." && pwd)"
if [ -x "$REPO/.venv/bin/python3" ]; then
exec "$REPO/.venv/bin/python3" -m pipekit "$@"
fi
exec python3 -m pipekit "$@"

View File

@ -1,7 +1,8 @@
database: /opt/pipekit/pipekit.db
jrunner_path: /usr/local/bin/jrunner
driver_dir: /opt/pipekit/drivers/
api_port: 8100
api_host: 0.0.0.0
api_port: 8200
# smtp:
# host: smtp.example.com
# port: 587

97
deploy.sh Executable file
View File

@ -0,0 +1,97 @@
#!/usr/bin/env bash
# Pipekit deployment — idempotent. Re-run any time.
#
# Steps:
# 1. Check prerequisites (python3, jrunner on PATH)
# 2. Create Python venv at $REPO/.venv and install requirements
# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python)
# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body)
# 5. Run `pipekit init` to create/upgrade the SQLite schema
# 6. Register driver rows for every JDBC jar shipped with jrunner
#
# After running:
# - Set DB passwords with: sudo pipekit secrets set <KEY>
# - See systemd/pipekit.service for a unit file template
set -euo pipefail
REPO_DIR="${PIPEKIT_REPO:-$(cd "$(dirname "$0")" && pwd)}"
VENV_DIR="$REPO_DIR/.venv"
LAUNCHER="/usr/local/bin/pipekit"
CONFIG_DIR="/etc/pipekit"
SECRETS_FILE="$CONFIG_DIR/secrets.env"
if [ "$EUID" -ne 0 ]; then
exec sudo -H -E "$0" "$@"
fi
echo "== pipekit deploy =="
echo "repo: $REPO_DIR"
echo "venv: $VENV_DIR"
echo "secrets: $SECRETS_FILE"
echo ""
command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; }
command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; }
if [ ! -d "$VENV_DIR" ]; then
echo "Creating venv at $VENV_DIR"
python3 -m venv "$VENV_DIR"
fi
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
echo "Python deps installed."
# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed.
chmod +x "$REPO_DIR/bin/pipekit"
ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER"
echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit"
install -d -m 0755 "$CONFIG_DIR"
if [ ! -f "$SECRETS_FILE" ]; then
install -m 0600 /dev/null "$SECRETS_FILE"
cat > "$SECRETS_FILE" <<'EOF'
# pipekit secrets — sourced by the service process (EnvironmentFile=)
# or by the shell before `pipekit serve`. One KEY=VALUE per line.
# Connection rows reference these as $KEY (e.g. password: "$DB2PW").
#
# This file must stay mode 0600 and out of version control.
# Use `sudo pipekit secrets set <KEY>` to add entries safely.
EOF
chmod 0600 "$SECRETS_FILE"
echo "Created $SECRETS_FILE"
else
echo "Keeping existing $SECRETS_FILE"
fi
"$LAUNCHER" init
# Register drivers for each JDBC jar jrunner ships with.
JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib"
register_jar() {
local kind="$1" pattern="$2"
local jar
jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)"
if [ -n "$jar" ]; then
"$LAUNCHER" drivers register "$kind" --jar "$jar"
else
echo " (no $pattern in $JR_LIB — skipping $kind)"
fi
}
register_jar db2 "jt400-*.jar"
register_jar pg "postgresql-*.jar"
register_jar mssql "mssql-jdbc-*.jar"
echo ""
echo "pipekit deployed."
echo ""
echo "Next steps:"
echo " 1. Set passwords: sudo pipekit secrets set DB2PW"
echo " sudo pipekit secrets set PGPW"
echo " 2. Start the server manually:"
echo " set -a; source $SECRETS_FILE; set +a"
echo " pipekit serve"
echo " 3. Or install the systemd unit:"
echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/"
echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now pipekit"

View File

@ -45,6 +45,48 @@ def cmd_drivers_list(args) -> int:
return 0
_DEFAULT_JDBC_CLASSES = {
"db2": "com.ibm.as400.access.AS400JDBCDriver",
"pg": "org.postgresql.Driver",
"mssql": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
def cmd_drivers_register(args) -> int:
try:
drivers.get_driver(args.kind)
except ValueError as e:
print(f"error: {e}")
return 1
existing = [d for d in repo.list_drivers() if d["kind"] == args.kind]
if existing and not args.force:
print(f"driver kind {args.kind!r} already registered as "
f"{existing[0]['name']!r} (id={existing[0]['id']}). "
f"Use --force to add a second row.")
return 0
class_name = args.class_name or _DEFAULT_JDBC_CLASSES.get(args.kind)
if not class_name:
print(f"error: no built-in JDBC class for kind {args.kind!r}; "
f"pass --class explicitly")
return 1
import os
if not os.path.exists(args.jar):
print(f"warning: jar {args.jar!r} does not exist "
f"(registering anyway)")
name = args.name or f"{args.kind}-jdbc"
row = repo.create_driver(
name=name, kind=args.kind, jar_file=args.jar,
class_name=class_name, url_template=args.url_template,
)
print(f"registered driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
return 0
def cmd_drivers_show(args) -> int:
try:
d = drivers.get_driver(args.kind)
@ -95,8 +137,10 @@ def cmd_serve(args) -> int:
import uvicorn
from .api import create_app
port = args.port or get_config().api_port
uvicorn.run(create_app(), host=args.host, port=port, reload=args.reload)
cfg = get_config()
host = args.host or cfg.api_host
port = args.port or cfg.api_port
uvicorn.run(create_app(), host=host, port=port, reload=args.reload)
return 0
@ -113,6 +157,101 @@ def cmd_set_password(args) -> int:
return 0
_DEFAULT_SECRETS_FILE = "/etc/pipekit/secrets.env"
def _secrets_path(args) -> str:
import os
return (getattr(args, "file", None)
or os.environ.get("PIPEKIT_SECRETS")
or _DEFAULT_SECRETS_FILE)
def cmd_secrets_list(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 0
keys: list[str] = []
with open(path) as f:
for line in f:
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
keys.append(s.split("=", 1)[0])
print(f"{path}{len(keys)} secret(s):")
for k in keys:
print(f" {k}")
return 0
def cmd_secrets_set(args) -> int:
import getpass
import os
import stat
path = _secrets_path(args)
value = args.value if args.value is not None else getpass.getpass(
f"value for {args.key}: ")
if not value:
print("error: empty value")
return 1
lines: list[str] = []
replaced = False
if os.path.exists(path):
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
lines.append(f"{args.key}={value}\n")
replaced = True
else:
lines.append(line)
if not replaced:
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(f"{args.key}={value}\n")
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(lines)
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600
os.replace(tmp, path)
print(f"{'updated' if replaced else 'added'} {args.key} in {path}")
return 0
def cmd_secrets_unset(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 1
kept: list[str] = []
removed = False
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
removed = True
continue
kept.append(line)
if not removed:
print(f"{args.key} not set in {path}")
return 0
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(kept)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(f"removed {args.key} from {path}")
return 0
def _report(checks) -> int:
width = max(len(name) for name, _, _ in checks)
failures = 0
@ -150,6 +289,21 @@ def main(argv: list[str] | None = None) -> int:
p_drv_show.add_argument("kind", help="one of the kinds from `pipekit drivers list`")
p_drv_show.set_defaults(func=cmd_drivers_show)
p_drv_reg = drv_sub.add_parser(
"register", help="add a driver row to the database")
p_drv_reg.add_argument("kind", help="driver kind (db2, pg, mssql)")
p_drv_reg.add_argument("--jar", required=True,
help="absolute path to the JDBC jar")
p_drv_reg.add_argument("--name",
help="registry name (default: <kind>-jdbc)")
p_drv_reg.add_argument("--class", dest="class_name",
help="JDBC Driver class (default: built-in per kind)")
p_drv_reg.add_argument("--url-template",
help="optional JDBC URL template for the wizard")
p_drv_reg.add_argument("--force", action="store_true",
help="register even if a row for this kind exists")
p_drv_reg.set_defaults(func=cmd_drivers_register)
p_run = sub.add_parser("run", help="run a module by name (synchronous)")
p_run.add_argument("module", help="module name")
p_run.add_argument("--dry-run", action="store_true",
@ -157,7 +311,8 @@ def main(argv: list[str] | None = None) -> int:
p_run.set_defaults(func=cmd_run)
p_serve = sub.add_parser("serve", help="start the HTTP API")
p_serve.add_argument("--host", default="127.0.0.1")
p_serve.add_argument("--host", default=None,
help="defaults to config.yaml api_host (127.0.0.1)")
p_serve.add_argument("--port", type=int, default=None,
help="defaults to config.yaml api_port")
p_serve.add_argument("--reload", action="store_true")
@ -167,6 +322,27 @@ def main(argv: list[str] | None = None) -> int:
p_pw.add_argument("username")
p_pw.set_defaults(func=cmd_set_password)
p_sec = sub.add_parser(
"secrets",
help=f"manage the env-var file for DB passwords (default {_DEFAULT_SECRETS_FILE})")
sec_sub = p_sec.add_subparsers(dest="sec_cmd", required=True)
p_sec_list = sec_sub.add_parser("list", help="list keys in the secrets file")
p_sec_list.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_list.set_defaults(func=cmd_secrets_list)
p_sec_set = sec_sub.add_parser("set", help="add or update a KEY (prompted if value omitted)")
p_sec_set.add_argument("key")
p_sec_set.add_argument("value", nargs="?", default=None,
help="value; omit to be prompted (safer)")
p_sec_set.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_set.set_defaults(func=cmd_secrets_set)
p_sec_unset = sec_sub.add_parser("unset", help="remove a KEY from the secrets file")
p_sec_unset.add_argument("key")
p_sec_unset.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_unset.set_defaults(func=cmd_secrets_unset)
args = p.parse_args(argv)
return args.func(args)

View File

@ -28,6 +28,10 @@ class Config:
def api_port(self) -> int:
return int(self._data.get("api_port", 8100))
@property
def api_host(self) -> str:
return str(self._data.get("api_host", "127.0.0.1"))
def get(self, key: str, default=None):
return self._data.get(key, default)

View File

@ -91,16 +91,19 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
status = "success"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
# 4. ensure staging table exists on dest. Mirror the real dest schema
# so jrunner's auto-DELETE and the subsequent merge INSERT both find
# a table to work on. Idempotent — no-op after first run.
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so
# any drift — dest columns added since staging was last made — is
# self-healing. Staging is ephemeral per SPEC; nothing of value lives
# between runs.
staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]:
jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
f"CREATE TABLE {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);",
)
@ -128,6 +131,9 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
except Exception as e: # noqa: BLE001
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
if isinstance(e, jrunner.JrunnerError):
repo.log_run_output(run_id, jrunner_stdout=e.stdout,
jrunner_stderr=e.stderr)
# Failure-path hooks, if any. Never let these mask the real error.
try:
hook_log = _run_hooks(module_id, fail_fast=False,

View File

@ -125,6 +125,9 @@ def query(
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
reader = csv.reader(io.StringIO(r.stdout))
header = next(reader, [])
@ -169,6 +172,9 @@ def migrate(
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
return MigrateResult(
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
@ -202,6 +208,35 @@ def _parse_row_count(text: str) -> int | None:
return None
# jrunner catches SQLException, prints the stack trace, then exits 0 at
# nearly every failure site (see jrunner.java). Detect those by scanning
# for a Java stack-trace signature so callers don't treat silent failures
# as success.
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
_EXCEPTION_HEADER_RE = re.compile(
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
# jrunner runs query-mode SQL with `executeQuery`, which requires the
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
# still executes, but PG then throws "No results were returned by the
# query." The statement succeeded — ignore the trace.
_BENIGN_EXCEPTION_SUBSTRINGS = (
"No results were returned by the query",
)
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
"""Return a short error summary if jrunner exited 0 but logged a failure."""
combined = (stderr or "") + "\n" + (stdout or "")
if not _STACK_FRAME_RE.search(combined):
return None
m = _EXCEPTION_HEADER_RE.search(combined)
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
return None
return header
class JrunnerError(RuntimeError):
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
super().__init__(message)

View File

@ -177,6 +177,40 @@ def list_modules() -> list[dict]:
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
def update_module(module_id: int, *, name: str | None = None,
source_connection_id: int | None = None,
dest_connection_id: int | None = None,
dest_table: str | None = None,
staging_table: str | None = None,
source_query: str | None = None,
merge_strategy: str | None = None,
merge_key: str | None = None,
dest_description: str | None = None,
enabled: int | None = None) -> dict | None:
fields: list[str] = []
values: list = []
for col, val in (("name", name),
("source_connection_id", source_connection_id),
("dest_connection_id", dest_connection_id),
("dest_table", dest_table),
("staging_table", staging_table),
("source_query", source_query),
("merge_strategy", merge_strategy),
("merge_key", merge_key),
("dest_description", dest_description),
("enabled", enabled)):
if val is not None:
fields.append(f"{col}=?")
values.append(val)
if not fields:
return get_module(module_id)
fields.append("updated_at=datetime('now')")
values.append(module_id)
with db.connect() as c:
c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values)
return get_module(module_id)
class ModuleRunning(RuntimeError):
"""Raised by delete_module when the module is currently running."""

View File

@ -13,6 +13,7 @@ wizard, editors, and SSE-driven live run watch come next.
from __future__ import annotations
from pathlib import Path
from urllib.parse import urlencode
from fastapi import APIRouter, FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@ -131,6 +132,72 @@ def module_detail(request: Request, module_id: int):
)
@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse)
def module_edit(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
return _templates.TemplateResponse(
request,
"module_form.html",
_ctx(module=module, connections=repo.list_connections(),
form_action=f"/modules/{module_id}",
cancel_url=f"/modules/{module_id}"),
)
@_router.post("/modules/{module_id}")
async def module_update(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
form = await request.form()
new_name = form["name"].strip()
if new_name != module["name"]:
existing = repo.get_module_by_name(new_name)
if existing is not None:
raise HTTPException(
409, f"module name {new_name!r} already exists — pick another")
merge_strategy = form.get("merge_strategy", "full")
if merge_strategy not in ("full", "incremental", "append"):
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
new_staging = form["staging_table"].strip()
new_description = (form.get("dest_description") or "").strip() or None
# DDL snap: drop the staging table under its old name (runner recreates
# fresh from dest on next run). Re-apply dest COMMENT if it changed.
dest_conn = repo.get_connection(module["dest_connection_id"])
if dest_conn is not None:
try:
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
if new_description != module["dest_description"]:
jrunner.run_dest_sql(
dest_conn,
f"COMMENT ON TABLE {module['dest_table']} IS "
f"{_sql_str(new_description or '')};",
)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest DDL snap failed: {e}")
repo.update_module(
module_id,
name=new_name,
source_connection_id=int(form["source_connection_id"]),
dest_connection_id=int(form["dest_connection_id"]),
dest_table=form["dest_table"].strip(),
staging_table=new_staging,
source_query=form["source_query"],
merge_strategy=merge_strategy,
merge_key=(form.get("merge_key") or "").strip() or None,
dest_description=new_description,
enabled=1 if form.get("enabled") == "1" else 0,
)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@_router.post("/modules/{module_id}/delete")
def module_delete(module_id: int):
if repo.get_module(module_id) is None:
@ -274,6 +341,23 @@ def wizard_step3(request: Request,
default_dest_conn_id = conn.get("default_dest_connection_id")
default_dest_schema = conn.get("default_dest_schema") or ""
# Proactive warning — if the default dest table already exists, surface
# its columns now so the user can align picks before submit.
dest_warn: dict | None = None
if not fetch_error and default_dest_conn_id and default_dest_schema:
dest_conn_row = repo.get_connection(default_dest_conn_id)
if dest_conn_row is not None:
try:
existing = _existing_dest_columns(
dest_conn_row, default_dest_schema, default_module_name)
except jrunner.JrunnerError:
existing = None
if existing is not None:
dest_warn = {
"qualified": f"{default_dest_schema}.{default_module_name}",
"columns": sorted(existing),
}
return _templates.TemplateResponse(
request,
"wizard_step3.html",
@ -283,7 +367,8 @@ def wizard_step3(request: Request,
table_description=table_description,
fetch_error=fetch_error, default_module_name=default_module_name,
default_dest_conn_id=default_dest_conn_id,
default_dest_schema=default_dest_schema),
default_dest_schema=default_dest_schema,
dest_warn=dest_warn),
)
@ -367,16 +452,63 @@ async def wizard_create(request: Request):
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
except NotImplementedError as e:
raise HTTPException(400, str(e))
effective_staging = staging_table or f"pipekit_staging.{module_name}"
staging_schema, _, _ = effective_staging.partition(".")
# If the dest table already exists, don't clobber it. Verify the picks
# match its shape and skip CREATE/COMMENT.
try:
existing_cols = _existing_dest_columns(
dest_conn, dest_schema, dest_table_bare)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"could not introspect dest: {e}")
dest_exists = existing_cols is not None
if dest_exists:
missing = [c["dest_name"] for c in chosen
if c["dest_name"].lower() not in existing_cols]
if missing:
back_qs = urlencode(
[("source_connection_id", source_connection_id),
("table", table),
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
*qvals.items()])
return _templates.TemplateResponse(
request,
"wizard_error.html",
_ctx(
title="Dest table column mismatch",
qualified_dest=qualified_dest,
missing=missing,
existing=sorted(existing_cols),
back_qs=back_qs,
),
status_code=409,
)
try:
jrunner.run_dest_sql(
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};",
)
if not dest_exists:
jrunner.run_dest_sql(dest_conn, create_table_sql)
comment_sql = _build_comment_sql(dest_drv, qualified_dest,
dest_description, chosen)
if comment_sql:
jrunner.run_dest_sql(dest_conn, comment_sql)
# Pre-align staging to dest so first run doesn't surprise us.
if staging_schema and staging_schema != effective_staging:
jrunner.run_dest_sql(
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
)
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE {effective_staging} "
f"(LIKE {qualified_dest} INCLUDING ALL);",
)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest provisioning failed: {e}")
@ -400,6 +532,21 @@ def _sql_str(v: str) -> str:
return "'" + v.replace("'", "''") + "'"
def _existing_dest_columns(dest_conn: dict, schema: str,
table: str) -> set[str] | None:
"""Return lowercase column names of an existing PG dest table, or
None if it doesn't exist. PG-only; fine while pg is the sole dest."""
r = jrunner.run_dest_sql(
dest_conn,
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema={_sql_str(schema)} "
f"AND table_name={_sql_str(table)}",
)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
def _build_comment_sql(dest_drv, qualified_dest: str,
table_description: str | None,
columns: list[dict]) -> str:

View File

@ -277,3 +277,4 @@ table.picker tbody tr:hover td { background: #1c2128; }
}
.flash.ok { border-color: #2f6b35; background: #16261a; color: #b6dcb8; }
.flash.err { border-color: #6b2f2f; background: #261616; color: #dcb6b6; }
.flash.warn { border-color: #6b5a2f; background: #261f16; color: #e1c98a; }

View File

@ -19,6 +19,7 @@
<input type="hidden" name="dry_run" value="1">
<button type="submit">Dry run</button>
</form>
<a class="btn" href="/modules/{{ module.id }}/edit">Edit</a>
<form class="inline" method="post" action="/modules/{{ module.id }}/delete"
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
@ -41,7 +42,8 @@
<div>
<div class="panel">
<header>Source query
<span class="subtitle">free text — edit opens in $EDITOR (TODO)</span>
<span class="subtitle">free text with <code>{watermark}</code> placeholders</span>
<span style="margin-left:auto"><a href="/modules/{{ module.id }}/edit">edit</a></span>
</header>
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
</div>

View File

@ -0,0 +1,115 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}Edit module &middot; {{ module.name }} — Pipekit{% endblock %}
{% block content %}
<div class="panel">
<header>
Edit module &middot; {{ module.name }}
<span class="subtitle">
module #{{ module.id }}
{% if module.running %}<span class="pill running">running</span>{% endif %}
</span>
<span style="margin-left:auto"><a href="{{ cancel_url }}">&larr; back to module</a></span>
</header>
<div class="body">
{% if module.running %}
<div class="flash warning" style="margin-bottom:0.8rem">
This module is currently running. Saving changes now is allowed but may affect the in-flight run.
</div>
{% endif %}
<form method="post" action="{{ form_action }}">
<label class="field">
<span>name</span>
<input type="text" name="name" required value="{{ module.name }}">
<span class="help">must be unique; also used as the default staging table suffix</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>source connection</span>
<select name="source_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.source_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where <code>source_query</code> runs</span>
</label>
<label class="field">
<span>dest connection</span>
<select name="dest_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.dest_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where staging + merge run</span>
</label>
</div>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>dest table</span>
<input type="text" name="dest_table" required value="{{ module.dest_table }}">
<span class="help"><code>schema.table</code> — editing here does NOT rename the actual table</span>
</label>
<label class="field">
<span>staging table</span>
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
<span class="help">dropped on save; recreated from dest on next run</span>
</label>
</div>
<label class="field">
<span>source query</span>
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>merge strategy</span>
<select name="merge_strategy" required>
{% for s in ("full", "incremental", "append") %}
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<span>merge key</span>
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
placeholder="id or (col_a, col_b)">
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
</label>
</div>
<label class="field">
<span>dest description</span>
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
</label>
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
<input type="checkbox" name="enabled" value="1"
{% if module.enabled %}checked{% endif %}>
<span>enabled</span>
<span class="help">disabled modules are skipped by group runs; ad-hoc runs still work</span>
</label>
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
<button type="submit" class="primary">save changes</button>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,57 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}New module — error{% endblock %}
{% block content %}
<div class="panel">
<header>
Wizard error
<span class="subtitle">{{ title }}</span>
<span style="margin-left:auto">
<a href="/wizard/columns?{{ back_qs }}">&larr; back to step 3</a>
</span>
</header>
<div class="body">
<div class="flash err">
Dest table <code>{{ qualified_dest }}</code> already exists, but your
picks don't match its schema. Drop the table, choose a different
dest table, or align your column picks (dest names) with the
existing columns below.
</div>
<div class="two-col">
<div class="panel">
<header>
Missing from existing table
<span class="subtitle">{{ missing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for m in missing %}
<tr><td class="mono">{{ m }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
<div class="panel">
<header>
Columns in existing dest
<span class="subtitle">{{ existing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for c in existing %}
<tr><td class="mono">{{ c }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -27,10 +27,45 @@
</label>
{% endfor %}
<label class="field">
<span>table (skip browse)</span>
<input type="text" name="table" id="jump-table"
placeholder="schema.table — or leave blank to browse"
autocomplete="off" spellcheck="false">
<span class="help">if you know the table, fill this in and click jump. <code>schema.table</code> auto-populates the qualifier above on tab-out.</span>
</label>
<div class="actions" style="margin-top:0.8rem">
<button type="submit" class="primary">browse &rarr;</button>
<button type="submit" id="jump-btn"
formaction="/wizard/columns" disabled>
jump to columns &rarr;
</button>
</div>
</form>
<script>
(function () {
var tbl = document.getElementById('jump-table');
var btn = document.getElementById('jump-btn');
var schemaInput = document.querySelector('input[name="schema"]')
|| document.querySelector('input[name="library"]');
function updateBtn() {
btn.disabled = !tbl.value.trim();
}
tbl.addEventListener('input', updateBtn);
tbl.addEventListener('blur', function () {
var v = tbl.value.trim();
var dot = v.indexOf('.');
if (dot > 0 && schemaInput) {
schemaInput.value = v.substring(0, dot);
tbl.value = v.substring(dot + 1);
updateBtn();
}
});
updateBtn();
})();
</script>
</div>
</div>

View File

@ -18,6 +18,16 @@
</div>
</div>
{% if dest_warn %}
<div class="flash warn">
Dest table <code>{{ dest_warn.qualified }}</code> already exists on the
default destination. If you proceed, pipekit will <strong>not</strong> drop
or recreate it — your picks below (dest names) must match the existing
columns, or the create will fail. Existing columns:
<span class="mono">{{ dest_warn.columns|join(', ') }}</span>
</div>
{% endif %}
{% if not fetch_error %}
<form method="post" action="/wizard/create">
<input type="hidden" name="source_connection_id" value="{{ connection.id }}">

29
systemd/pipekit.service Normal file
View File

@ -0,0 +1,29 @@
# Pipekit systemd unit template.
#
# Install:
# sudo cp pipekit.service /etc/systemd/system/
# sudo systemctl daemon-reload
# sudo systemctl enable --now pipekit
#
# Runs as root by default. For a dedicated service account, create the
# user and uncomment User=/Group= below:
# sudo useradd --system --home-dir /opt/pipekit --shell /usr/sbin/nologin pipekit
# sudo chown -R pipekit:pipekit /opt/pipekit/pipekit.db /etc/pipekit
[Unit]
Description=Pipekit sync engine
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
# User=pipekit
# Group=pipekit
WorkingDirectory=/opt/pipekit
EnvironmentFile=/etc/pipekit/secrets.env
ExecStart=/usr/local/bin/pipekit serve --host 0.0.0.0
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target