Compare commits

..

No commits in common. "f18ea55a12f67cfe3ab0089458eff1480bc2630a" and "d952b48a4e093391c9935fdbe30a5231d8490216" have entirely different histories.

17 changed files with 16 additions and 773 deletions

3
.gitignore vendored
View File

@ -10,6 +10,3 @@ pipekit.db-shm
# Local Claude Code settings. # Local Claude Code settings.
.claude/settings.local.json .claude/settings.local.json
# Python venv created by deploy.sh.
.venv/

View File

@ -1,9 +1,4 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Thin launcher: prefer the repo venv if deploy.sh created one, # Thin launcher: run `pipekit` from anywhere.
# else fall back to system python3.
set -euo pipefail set -euo pipefail
REPO="$(cd "$(dirname "$(readlink -f "$0")")/.." && pwd)"
if [ -x "$REPO/.venv/bin/python3" ]; then
exec "$REPO/.venv/bin/python3" -m pipekit "$@"
fi
exec python3 -m pipekit "$@" exec python3 -m pipekit "$@"

View File

@ -1,8 +1,7 @@
database: /opt/pipekit/pipekit.db database: /opt/pipekit/pipekit.db
jrunner_path: /usr/local/bin/jrunner jrunner_path: /usr/local/bin/jrunner
driver_dir: /opt/pipekit/drivers/ driver_dir: /opt/pipekit/drivers/
api_host: 0.0.0.0 api_port: 8100
api_port: 8200
# smtp: # smtp:
# host: smtp.example.com # host: smtp.example.com
# port: 587 # port: 587

View File

@ -1,97 +0,0 @@
#!/usr/bin/env bash
# Pipekit deployment — idempotent. Re-run any time.
#
# Steps:
# 1. Check prerequisites (python3, jrunner on PATH)
# 2. Create Python venv at $REPO/.venv and install requirements
# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python)
# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body)
# 5. Run `pipekit init` to create/upgrade the SQLite schema
# 6. Register driver rows for every JDBC jar shipped with jrunner
#
# After running:
# - Set DB passwords with: sudo pipekit secrets set <KEY>
# - See systemd/pipekit.service for a unit file template
set -euo pipefail
REPO_DIR="${PIPEKIT_REPO:-$(cd "$(dirname "$0")" && pwd)}"
VENV_DIR="$REPO_DIR/.venv"
LAUNCHER="/usr/local/bin/pipekit"
CONFIG_DIR="/etc/pipekit"
SECRETS_FILE="$CONFIG_DIR/secrets.env"
if [ "$EUID" -ne 0 ]; then
exec sudo -H -E "$0" "$@"
fi
echo "== pipekit deploy =="
echo "repo: $REPO_DIR"
echo "venv: $VENV_DIR"
echo "secrets: $SECRETS_FILE"
echo ""
command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; }
command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; }
if [ ! -d "$VENV_DIR" ]; then
echo "Creating venv at $VENV_DIR"
python3 -m venv "$VENV_DIR"
fi
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
echo "Python deps installed."
# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed.
chmod +x "$REPO_DIR/bin/pipekit"
ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER"
echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit"
install -d -m 0755 "$CONFIG_DIR"
if [ ! -f "$SECRETS_FILE" ]; then
install -m 0600 /dev/null "$SECRETS_FILE"
cat > "$SECRETS_FILE" <<'EOF'
# pipekit secrets — sourced by the service process (EnvironmentFile=)
# or by the shell before `pipekit serve`. One KEY=VALUE per line.
# Connection rows reference these as $KEY (e.g. password: "$DB2PW").
#
# This file must stay mode 0600 and out of version control.
# Use `sudo pipekit secrets set <KEY>` to add entries safely.
EOF
chmod 0600 "$SECRETS_FILE"
echo "Created $SECRETS_FILE"
else
echo "Keeping existing $SECRETS_FILE"
fi
"$LAUNCHER" init
# Register drivers for each JDBC jar jrunner ships with.
JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib"
register_jar() {
local kind="$1" pattern="$2"
local jar
jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)"
if [ -n "$jar" ]; then
"$LAUNCHER" drivers register "$kind" --jar "$jar"
else
echo " (no $pattern in $JR_LIB — skipping $kind)"
fi
}
register_jar db2 "jt400-*.jar"
register_jar pg "postgresql-*.jar"
register_jar mssql "mssql-jdbc-*.jar"
echo ""
echo "pipekit deployed."
echo ""
echo "Next steps:"
echo " 1. Set passwords: sudo pipekit secrets set DB2PW"
echo " sudo pipekit secrets set PGPW"
echo " 2. Start the server manually:"
echo " set -a; source $SECRETS_FILE; set +a"
echo " pipekit serve"
echo " 3. Or install the systemd unit:"
echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/"
echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now pipekit"

View File

@ -45,48 +45,6 @@ def cmd_drivers_list(args) -> int:
return 0 return 0
_DEFAULT_JDBC_CLASSES = {
"db2": "com.ibm.as400.access.AS400JDBCDriver",
"pg": "org.postgresql.Driver",
"mssql": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
def cmd_drivers_register(args) -> int:
try:
drivers.get_driver(args.kind)
except ValueError as e:
print(f"error: {e}")
return 1
existing = [d for d in repo.list_drivers() if d["kind"] == args.kind]
if existing and not args.force:
print(f"driver kind {args.kind!r} already registered as "
f"{existing[0]['name']!r} (id={existing[0]['id']}). "
f"Use --force to add a second row.")
return 0
class_name = args.class_name or _DEFAULT_JDBC_CLASSES.get(args.kind)
if not class_name:
print(f"error: no built-in JDBC class for kind {args.kind!r}; "
f"pass --class explicitly")
return 1
import os
if not os.path.exists(args.jar):
print(f"warning: jar {args.jar!r} does not exist "
f"(registering anyway)")
name = args.name or f"{args.kind}-jdbc"
row = repo.create_driver(
name=name, kind=args.kind, jar_file=args.jar,
class_name=class_name, url_template=args.url_template,
)
print(f"registered driver id={row['id']} name={row['name']!r} "
f"kind={row['kind']!r}")
return 0
def cmd_drivers_show(args) -> int: def cmd_drivers_show(args) -> int:
try: try:
d = drivers.get_driver(args.kind) d = drivers.get_driver(args.kind)
@ -137,10 +95,8 @@ def cmd_serve(args) -> int:
import uvicorn import uvicorn
from .api import create_app from .api import create_app
cfg = get_config() port = args.port or get_config().api_port
host = args.host or cfg.api_host uvicorn.run(create_app(), host=args.host, port=port, reload=args.reload)
port = args.port or cfg.api_port
uvicorn.run(create_app(), host=host, port=port, reload=args.reload)
return 0 return 0
@ -157,101 +113,6 @@ def cmd_set_password(args) -> int:
return 0 return 0
_DEFAULT_SECRETS_FILE = "/etc/pipekit/secrets.env"
def _secrets_path(args) -> str:
import os
return (getattr(args, "file", None)
or os.environ.get("PIPEKIT_SECRETS")
or _DEFAULT_SECRETS_FILE)
def cmd_secrets_list(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 0
keys: list[str] = []
with open(path) as f:
for line in f:
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
keys.append(s.split("=", 1)[0])
print(f"{path}{len(keys)} secret(s):")
for k in keys:
print(f" {k}")
return 0
def cmd_secrets_set(args) -> int:
import getpass
import os
import stat
path = _secrets_path(args)
value = args.value if args.value is not None else getpass.getpass(
f"value for {args.key}: ")
if not value:
print("error: empty value")
return 1
lines: list[str] = []
replaced = False
if os.path.exists(path):
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
lines.append(f"{args.key}={value}\n")
replaced = True
else:
lines.append(line)
if not replaced:
if lines and not lines[-1].endswith("\n"):
lines.append("\n")
lines.append(f"{args.key}={value}\n")
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(lines)
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600
os.replace(tmp, path)
print(f"{'updated' if replaced else 'added'} {args.key} in {path}")
return 0
def cmd_secrets_unset(args) -> int:
import os
path = _secrets_path(args)
if not os.path.exists(path):
print(f"{path}: no such file")
return 1
kept: list[str] = []
removed = False
with open(path) as f:
for line in f:
s = line.strip()
if s and not s.startswith("#") and "=" in s \
and s.split("=", 1)[0] == args.key:
removed = True
continue
kept.append(line)
if not removed:
print(f"{args.key} not set in {path}")
return 0
tmp = path + ".tmp"
with open(tmp, "w") as f:
f.writelines(kept)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
print(f"removed {args.key} from {path}")
return 0
def _report(checks) -> int: def _report(checks) -> int:
width = max(len(name) for name, _, _ in checks) width = max(len(name) for name, _, _ in checks)
failures = 0 failures = 0
@ -289,21 +150,6 @@ def main(argv: list[str] | None = None) -> int:
p_drv_show.add_argument("kind", help="one of the kinds from `pipekit drivers list`") p_drv_show.add_argument("kind", help="one of the kinds from `pipekit drivers list`")
p_drv_show.set_defaults(func=cmd_drivers_show) p_drv_show.set_defaults(func=cmd_drivers_show)
p_drv_reg = drv_sub.add_parser(
"register", help="add a driver row to the database")
p_drv_reg.add_argument("kind", help="driver kind (db2, pg, mssql)")
p_drv_reg.add_argument("--jar", required=True,
help="absolute path to the JDBC jar")
p_drv_reg.add_argument("--name",
help="registry name (default: <kind>-jdbc)")
p_drv_reg.add_argument("--class", dest="class_name",
help="JDBC Driver class (default: built-in per kind)")
p_drv_reg.add_argument("--url-template",
help="optional JDBC URL template for the wizard")
p_drv_reg.add_argument("--force", action="store_true",
help="register even if a row for this kind exists")
p_drv_reg.set_defaults(func=cmd_drivers_register)
p_run = sub.add_parser("run", help="run a module by name (synchronous)") p_run = sub.add_parser("run", help="run a module by name (synchronous)")
p_run.add_argument("module", help="module name") p_run.add_argument("module", help="module name")
p_run.add_argument("--dry-run", action="store_true", p_run.add_argument("--dry-run", action="store_true",
@ -311,8 +157,7 @@ def main(argv: list[str] | None = None) -> int:
p_run.set_defaults(func=cmd_run) p_run.set_defaults(func=cmd_run)
p_serve = sub.add_parser("serve", help="start the HTTP API") p_serve = sub.add_parser("serve", help="start the HTTP API")
p_serve.add_argument("--host", default=None, p_serve.add_argument("--host", default="127.0.0.1")
help="defaults to config.yaml api_host (127.0.0.1)")
p_serve.add_argument("--port", type=int, default=None, p_serve.add_argument("--port", type=int, default=None,
help="defaults to config.yaml api_port") help="defaults to config.yaml api_port")
p_serve.add_argument("--reload", action="store_true") p_serve.add_argument("--reload", action="store_true")
@ -322,27 +167,6 @@ def main(argv: list[str] | None = None) -> int:
p_pw.add_argument("username") p_pw.add_argument("username")
p_pw.set_defaults(func=cmd_set_password) p_pw.set_defaults(func=cmd_set_password)
p_sec = sub.add_parser(
"secrets",
help=f"manage the env-var file for DB passwords (default {_DEFAULT_SECRETS_FILE})")
sec_sub = p_sec.add_subparsers(dest="sec_cmd", required=True)
p_sec_list = sec_sub.add_parser("list", help="list keys in the secrets file")
p_sec_list.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_list.set_defaults(func=cmd_secrets_list)
p_sec_set = sec_sub.add_parser("set", help="add or update a KEY (prompted if value omitted)")
p_sec_set.add_argument("key")
p_sec_set.add_argument("value", nargs="?", default=None,
help="value; omit to be prompted (safer)")
p_sec_set.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_set.set_defaults(func=cmd_secrets_set)
p_sec_unset = sec_sub.add_parser("unset", help="remove a KEY from the secrets file")
p_sec_unset.add_argument("key")
p_sec_unset.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
p_sec_unset.set_defaults(func=cmd_secrets_unset)
args = p.parse_args(argv) args = p.parse_args(argv)
return args.func(args) return args.func(args)

View File

@ -28,10 +28,6 @@ class Config:
def api_port(self) -> int: def api_port(self) -> int:
return int(self._data.get("api_port", 8100)) return int(self._data.get("api_port", 8100))
@property
def api_host(self) -> str:
return str(self._data.get("api_host", "127.0.0.1"))
def get(self, key: str, default=None): def get(self, key: str, default=None):
return self._data.get(key, default) return self._data.get(key, default)

View File

@ -91,19 +91,16 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
status = "success" status = "success"
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql) return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so # 4. ensure staging table exists on dest. Mirror the real dest schema
# any drift — dest columns added since staging was last made — is # so jrunner's auto-DELETE and the subsequent merge INSERT both find
# self-healing. Staging is ephemeral per SPEC; nothing of value lives # a table to work on. Idempotent — no-op after first run.
# between runs.
staging_schema, _, _ = module["staging_table"].partition(".") staging_schema, _, _ = module["staging_table"].partition(".")
if staging_schema and staging_schema != module["staging_table"]: if staging_schema and staging_schema != module["staging_table"]:
jrunner.run_dest_sql( jrunner.run_dest_sql(
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};") dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
jrunner.run_dest_sql( jrunner.run_dest_sql(
dest_conn, dest_conn,
f"CREATE TABLE {module['staging_table']} " f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
f"(LIKE {module['dest_table']} INCLUDING ALL);", f"(LIKE {module['dest_table']} INCLUDING ALL);",
) )
@ -131,9 +128,6 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
if isinstance(e, jrunner.JrunnerError):
repo.log_run_output(run_id, jrunner_stdout=e.stdout,
jrunner_stderr=e.stderr)
# Failure-path hooks, if any. Never let these mask the real error. # Failure-path hooks, if any. Never let these mask the real error.
try: try:
hook_log = _run_hooks(module_id, fail_fast=False, hook_log = _run_hooks(module_id, fail_fast=False,

View File

@ -125,9 +125,6 @@ def query(
if r.returncode != 0: if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(), raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr) stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
reader = csv.reader(io.StringIO(r.stdout)) reader = csv.reader(io.StringIO(r.stdout))
header = next(reader, []) header = next(reader, [])
@ -172,9 +169,6 @@ def migrate(
if r.returncode != 0: if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(), raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr) stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
return MigrateResult( return MigrateResult(
row_count=_parse_row_count(r.stdout + "\n" + r.stderr), row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
@ -208,35 +202,6 @@ def _parse_row_count(text: str) -> int | None:
return None return None
# jrunner catches SQLException, prints the stack trace, then exits 0 at
# nearly every failure site (see jrunner.java). Detect those by scanning
# for a Java stack-trace signature so callers don't treat silent failures
# as success.
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
_EXCEPTION_HEADER_RE = re.compile(
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
# jrunner runs query-mode SQL with `executeQuery`, which requires the
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
# still executes, but PG then throws "No results were returned by the
# query." The statement succeeded — ignore the trace.
_BENIGN_EXCEPTION_SUBSTRINGS = (
"No results were returned by the query",
)
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
"""Return a short error summary if jrunner exited 0 but logged a failure."""
combined = (stderr or "") + "\n" + (stdout or "")
if not _STACK_FRAME_RE.search(combined):
return None
m = _EXCEPTION_HEADER_RE.search(combined)
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
return None
return header
class JrunnerError(RuntimeError): class JrunnerError(RuntimeError):
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""): def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
super().__init__(message) super().__init__(message)

View File

@ -177,40 +177,6 @@ def list_modules() -> list[dict]:
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")] return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
def update_module(module_id: int, *, name: str | None = None,
source_connection_id: int | None = None,
dest_connection_id: int | None = None,
dest_table: str | None = None,
staging_table: str | None = None,
source_query: str | None = None,
merge_strategy: str | None = None,
merge_key: str | None = None,
dest_description: str | None = None,
enabled: int | None = None) -> dict | None:
fields: list[str] = []
values: list = []
for col, val in (("name", name),
("source_connection_id", source_connection_id),
("dest_connection_id", dest_connection_id),
("dest_table", dest_table),
("staging_table", staging_table),
("source_query", source_query),
("merge_strategy", merge_strategy),
("merge_key", merge_key),
("dest_description", dest_description),
("enabled", enabled)):
if val is not None:
fields.append(f"{col}=?")
values.append(val)
if not fields:
return get_module(module_id)
fields.append("updated_at=datetime('now')")
values.append(module_id)
with db.connect() as c:
c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values)
return get_module(module_id)
class ModuleRunning(RuntimeError): class ModuleRunning(RuntimeError):
"""Raised by delete_module when the module is currently running.""" """Raised by delete_module when the module is currently running."""

View File

@ -13,7 +13,6 @@ wizard, editors, and SSE-driven live run watch come next.
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from urllib.parse import urlencode
from fastapi import APIRouter, FastAPI, HTTPException, Query, Request from fastapi import APIRouter, FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
@ -132,72 +131,6 @@ def module_detail(request: Request, module_id: int):
) )
@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse)
def module_edit(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
return _templates.TemplateResponse(
request,
"module_form.html",
_ctx(module=module, connections=repo.list_connections(),
form_action=f"/modules/{module_id}",
cancel_url=f"/modules/{module_id}"),
)
@_router.post("/modules/{module_id}")
async def module_update(request: Request, module_id: int):
module = repo.get_module(module_id)
if module is None:
raise HTTPException(404, f"module id={module_id} not found")
form = await request.form()
new_name = form["name"].strip()
if new_name != module["name"]:
existing = repo.get_module_by_name(new_name)
if existing is not None:
raise HTTPException(
409, f"module name {new_name!r} already exists — pick another")
merge_strategy = form.get("merge_strategy", "full")
if merge_strategy not in ("full", "incremental", "append"):
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
new_staging = form["staging_table"].strip()
new_description = (form.get("dest_description") or "").strip() or None
# DDL snap: drop the staging table under its old name (runner recreates
# fresh from dest on next run). Re-apply dest COMMENT if it changed.
dest_conn = repo.get_connection(module["dest_connection_id"])
if dest_conn is not None:
try:
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
if new_description != module["dest_description"]:
jrunner.run_dest_sql(
dest_conn,
f"COMMENT ON TABLE {module['dest_table']} IS "
f"{_sql_str(new_description or '')};",
)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest DDL snap failed: {e}")
repo.update_module(
module_id,
name=new_name,
source_connection_id=int(form["source_connection_id"]),
dest_connection_id=int(form["dest_connection_id"]),
dest_table=form["dest_table"].strip(),
staging_table=new_staging,
source_query=form["source_query"],
merge_strategy=merge_strategy,
merge_key=(form.get("merge_key") or "").strip() or None,
dest_description=new_description,
enabled=1 if form.get("enabled") == "1" else 0,
)
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
@_router.post("/modules/{module_id}/delete") @_router.post("/modules/{module_id}/delete")
def module_delete(module_id: int): def module_delete(module_id: int):
if repo.get_module(module_id) is None: if repo.get_module(module_id) is None:
@ -341,23 +274,6 @@ def wizard_step3(request: Request,
default_dest_conn_id = conn.get("default_dest_connection_id") default_dest_conn_id = conn.get("default_dest_connection_id")
default_dest_schema = conn.get("default_dest_schema") or "" default_dest_schema = conn.get("default_dest_schema") or ""
# Proactive warning — if the default dest table already exists, surface
# its columns now so the user can align picks before submit.
dest_warn: dict | None = None
if not fetch_error and default_dest_conn_id and default_dest_schema:
dest_conn_row = repo.get_connection(default_dest_conn_id)
if dest_conn_row is not None:
try:
existing = _existing_dest_columns(
dest_conn_row, default_dest_schema, default_module_name)
except jrunner.JrunnerError:
existing = None
if existing is not None:
dest_warn = {
"qualified": f"{default_dest_schema}.{default_module_name}",
"columns": sorted(existing),
}
return _templates.TemplateResponse( return _templates.TemplateResponse(
request, request,
"wizard_step3.html", "wizard_step3.html",
@ -367,8 +283,7 @@ def wizard_step3(request: Request,
table_description=table_description, table_description=table_description,
fetch_error=fetch_error, default_module_name=default_module_name, fetch_error=fetch_error, default_module_name=default_module_name,
default_dest_conn_id=default_dest_conn_id, default_dest_conn_id=default_dest_conn_id,
default_dest_schema=default_dest_schema, default_dest_schema=default_dest_schema),
dest_warn=dest_warn),
) )
@ -452,63 +367,16 @@ async def wizard_create(request: Request):
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen) create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
except NotImplementedError as e: except NotImplementedError as e:
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
effective_staging = staging_table or f"pipekit_staging.{module_name}"
staging_schema, _, _ = effective_staging.partition(".")
# If the dest table already exists, don't clobber it. Verify the picks
# match its shape and skip CREATE/COMMENT.
try:
existing_cols = _existing_dest_columns(
dest_conn, dest_schema, dest_table_bare)
except jrunner.JrunnerError as e:
raise HTTPException(500, f"could not introspect dest: {e}")
dest_exists = existing_cols is not None
if dest_exists:
missing = [c["dest_name"] for c in chosen
if c["dest_name"].lower() not in existing_cols]
if missing:
back_qs = urlencode(
[("source_connection_id", source_connection_id),
("table", table),
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
*qvals.items()])
return _templates.TemplateResponse(
request,
"wizard_error.html",
_ctx(
title="Dest table column mismatch",
qualified_dest=qualified_dest,
missing=missing,
existing=sorted(existing_cols),
back_qs=back_qs,
),
status_code=409,
)
try: try:
jrunner.run_dest_sql( jrunner.run_dest_sql(
dest_conn, dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};", f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};",
) )
if not dest_exists:
jrunner.run_dest_sql(dest_conn, create_table_sql) jrunner.run_dest_sql(dest_conn, create_table_sql)
comment_sql = _build_comment_sql(dest_drv, qualified_dest, comment_sql = _build_comment_sql(dest_drv, qualified_dest,
dest_description, chosen) dest_description, chosen)
if comment_sql: if comment_sql:
jrunner.run_dest_sql(dest_conn, comment_sql) jrunner.run_dest_sql(dest_conn, comment_sql)
# Pre-align staging to dest so first run doesn't surprise us.
if staging_schema and staging_schema != effective_staging:
jrunner.run_dest_sql(
dest_conn,
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
)
jrunner.run_dest_sql(
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
jrunner.run_dest_sql(
dest_conn,
f"CREATE TABLE {effective_staging} "
f"(LIKE {qualified_dest} INCLUDING ALL);",
)
except jrunner.JrunnerError as e: except jrunner.JrunnerError as e:
raise HTTPException(500, f"dest provisioning failed: {e}") raise HTTPException(500, f"dest provisioning failed: {e}")
@ -532,21 +400,6 @@ def _sql_str(v: str) -> str:
return "'" + v.replace("'", "''") + "'" return "'" + v.replace("'", "''") + "'"
def _existing_dest_columns(dest_conn: dict, schema: str,
table: str) -> set[str] | None:
"""Return lowercase column names of an existing PG dest table, or
None if it doesn't exist. PG-only; fine while pg is the sole dest."""
r = jrunner.run_dest_sql(
dest_conn,
f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema={_sql_str(schema)} "
f"AND table_name={_sql_str(table)}",
)
if not r.rows:
return None
return {row[0].strip().lower() for row in r.rows if row and row[0]}
def _build_comment_sql(dest_drv, qualified_dest: str, def _build_comment_sql(dest_drv, qualified_dest: str,
table_description: str | None, table_description: str | None,
columns: list[dict]) -> str: columns: list[dict]) -> str:

View File

@ -277,4 +277,3 @@ table.picker tbody tr:hover td { background: #1c2128; }
} }
.flash.ok { border-color: #2f6b35; background: #16261a; color: #b6dcb8; } .flash.ok { border-color: #2f6b35; background: #16261a; color: #b6dcb8; }
.flash.err { border-color: #6b2f2f; background: #261616; color: #dcb6b6; } .flash.err { border-color: #6b2f2f; background: #261616; color: #dcb6b6; }
.flash.warn { border-color: #6b5a2f; background: #261f16; color: #e1c98a; }

View File

@ -19,7 +19,6 @@
<input type="hidden" name="dry_run" value="1"> <input type="hidden" name="dry_run" value="1">
<button type="submit">Dry run</button> <button type="submit">Dry run</button>
</form> </form>
<a class="btn" href="/modules/{{ module.id }}/edit">Edit</a>
<form class="inline" method="post" action="/modules/{{ module.id }}/delete" <form class="inline" method="post" action="/modules/{{ module.id }}/delete"
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')"> onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button> <button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
@ -42,8 +41,7 @@
<div> <div>
<div class="panel"> <div class="panel">
<header>Source query <header>Source query
<span class="subtitle">free text with <code>{watermark}</code> placeholders</span> <span class="subtitle">free text — edit opens in $EDITOR (TODO)</span>
<span style="margin-left:auto"><a href="/modules/{{ module.id }}/edit">edit</a></span>
</header> </header>
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div> <div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
</div> </div>

View File

@ -1,115 +0,0 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}Edit module &middot; {{ module.name }} — Pipekit{% endblock %}
{% block content %}
<div class="panel">
<header>
Edit module &middot; {{ module.name }}
<span class="subtitle">
module #{{ module.id }}
{% if module.running %}<span class="pill running">running</span>{% endif %}
</span>
<span style="margin-left:auto"><a href="{{ cancel_url }}">&larr; back to module</a></span>
</header>
<div class="body">
{% if module.running %}
<div class="flash warning" style="margin-bottom:0.8rem">
This module is currently running. Saving changes now is allowed but may affect the in-flight run.
</div>
{% endif %}
<form method="post" action="{{ form_action }}">
<label class="field">
<span>name</span>
<input type="text" name="name" required value="{{ module.name }}">
<span class="help">must be unique; also used as the default staging table suffix</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>source connection</span>
<select name="source_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.source_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where <code>source_query</code> runs</span>
</label>
<label class="field">
<span>dest connection</span>
<select name="dest_connection_id" required>
{% for c in connections %}
<option value="{{ c.id }}"
{% if c.id == module.dest_connection_id %}selected{% endif %}>
{{ c.name }}
</option>
{% endfor %}
</select>
<span class="help">where staging + merge run</span>
</label>
</div>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>dest table</span>
<input type="text" name="dest_table" required value="{{ module.dest_table }}">
<span class="help"><code>schema.table</code> — editing here does NOT rename the actual table</span>
</label>
<label class="field">
<span>staging table</span>
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
<span class="help">dropped on save; recreated from dest on next run</span>
</label>
</div>
<label class="field">
<span>source query</span>
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
</label>
<div class="two-col" style="gap:1rem">
<label class="field">
<span>merge strategy</span>
<select name="merge_strategy" required>
{% for s in ("full", "incremental", "append") %}
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<span>merge key</span>
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
placeholder="id or (col_a, col_b)">
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
</label>
</div>
<label class="field">
<span>dest description</span>
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
</label>
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
<input type="checkbox" name="enabled" value="1"
{% if module.enabled %}checked{% endif %}>
<span>enabled</span>
<span class="help">disabled modules are skipped by group runs; ad-hoc runs still work</span>
</label>
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
<button type="submit" class="primary">save changes</button>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@ -1,57 +0,0 @@
{% extends "base.html" %}
{% set section = "modules" %}
{% block title %}New module — error{% endblock %}
{% block content %}
<div class="panel">
<header>
Wizard error
<span class="subtitle">{{ title }}</span>
<span style="margin-left:auto">
<a href="/wizard/columns?{{ back_qs }}">&larr; back to step 3</a>
</span>
</header>
<div class="body">
<div class="flash err">
Dest table <code>{{ qualified_dest }}</code> already exists, but your
picks don't match its schema. Drop the table, choose a different
dest table, or align your column picks (dest names) with the
existing columns below.
</div>
<div class="two-col">
<div class="panel">
<header>
Missing from existing table
<span class="subtitle">{{ missing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for m in missing %}
<tr><td class="mono">{{ m }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
<div class="panel">
<header>
Columns in existing dest
<span class="subtitle">{{ existing|length }}</span>
</header>
<div class="body tight">
<table class="grid">
<tbody>
{% for c in existing %}
<tr><td class="mono">{{ c }}</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -27,45 +27,10 @@
</label> </label>
{% endfor %} {% endfor %}
<label class="field">
<span>table (skip browse)</span>
<input type="text" name="table" id="jump-table"
placeholder="schema.table — or leave blank to browse"
autocomplete="off" spellcheck="false">
<span class="help">if you know the table, fill this in and click jump. <code>schema.table</code> auto-populates the qualifier above on tab-out.</span>
</label>
<div class="actions" style="margin-top:0.8rem"> <div class="actions" style="margin-top:0.8rem">
<button type="submit" class="primary">browse &rarr;</button> <button type="submit" class="primary">browse &rarr;</button>
<button type="submit" id="jump-btn"
formaction="/wizard/columns" disabled>
jump to columns &rarr;
</button>
</div> </div>
</form> </form>
<script>
(function () {
var tbl = document.getElementById('jump-table');
var btn = document.getElementById('jump-btn');
var schemaInput = document.querySelector('input[name="schema"]')
|| document.querySelector('input[name="library"]');
function updateBtn() {
btn.disabled = !tbl.value.trim();
}
tbl.addEventListener('input', updateBtn);
tbl.addEventListener('blur', function () {
var v = tbl.value.trim();
var dot = v.indexOf('.');
if (dot > 0 && schemaInput) {
schemaInput.value = v.substring(0, dot);
tbl.value = v.substring(dot + 1);
updateBtn();
}
});
updateBtn();
})();
</script>
</div> </div>
</div> </div>

View File

@ -18,16 +18,6 @@
</div> </div>
</div> </div>
{% if dest_warn %}
<div class="flash warn">
Dest table <code>{{ dest_warn.qualified }}</code> already exists on the
default destination. If you proceed, pipekit will <strong>not</strong> drop
or recreate it — your picks below (dest names) must match the existing
columns, or the create will fail. Existing columns:
<span class="mono">{{ dest_warn.columns|join(', ') }}</span>
</div>
{% endif %}
{% if not fetch_error %} {% if not fetch_error %}
<form method="post" action="/wizard/create"> <form method="post" action="/wizard/create">
<input type="hidden" name="source_connection_id" value="{{ connection.id }}"> <input type="hidden" name="source_connection_id" value="{{ connection.id }}">

View File

@ -1,29 +0,0 @@
# Pipekit systemd unit template.
#
# Install:
# sudo cp pipekit.service /etc/systemd/system/
# sudo systemctl daemon-reload
# sudo systemctl enable --now pipekit
#
# Runs as root by default. For a dedicated service account, create the
# user and uncomment User=/Group= below:
# sudo useradd --system --home-dir /opt/pipekit --shell /usr/sbin/nologin pipekit
# sudo chown -R pipekit:pipekit /opt/pipekit/pipekit.db /etc/pipekit
[Unit]
Description=Pipekit sync engine
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
# User=pipekit
# Group=pipekit
WorkingDirectory=/opt/pipekit
EnvironmentFile=/etc/pipekit/secrets.env
ExecStart=/usr/local/bin/pipekit serve --host 0.0.0.0
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target