Compare commits
No commits in common. "f18ea55a12f67cfe3ab0089458eff1480bc2630a" and "d952b48a4e093391c9935fdbe30a5231d8490216" have entirely different histories.
f18ea55a12
...
d952b48a4e
3
.gitignore
vendored
3
.gitignore
vendored
@ -10,6 +10,3 @@ pipekit.db-shm
|
||||
|
||||
# Local Claude Code settings.
|
||||
.claude/settings.local.json
|
||||
|
||||
# Python venv created by deploy.sh.
|
||||
.venv/
|
||||
|
||||
@ -1,9 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
# Thin launcher: prefer the repo venv if deploy.sh created one,
|
||||
# else fall back to system python3.
|
||||
# Thin launcher: run `pipekit` from anywhere.
|
||||
set -euo pipefail
|
||||
REPO="$(cd "$(dirname "$(readlink -f "$0")")/.." && pwd)"
|
||||
if [ -x "$REPO/.venv/bin/python3" ]; then
|
||||
exec "$REPO/.venv/bin/python3" -m pipekit "$@"
|
||||
fi
|
||||
exec python3 -m pipekit "$@"
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
database: /opt/pipekit/pipekit.db
|
||||
jrunner_path: /usr/local/bin/jrunner
|
||||
driver_dir: /opt/pipekit/drivers/
|
||||
api_host: 0.0.0.0
|
||||
api_port: 8200
|
||||
api_port: 8100
|
||||
# smtp:
|
||||
# host: smtp.example.com
|
||||
# port: 587
|
||||
|
||||
97
deploy.sh
97
deploy.sh
@ -1,97 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Pipekit deployment — idempotent. Re-run any time.
|
||||
#
|
||||
# Steps:
|
||||
# 1. Check prerequisites (python3, jrunner on PATH)
|
||||
# 2. Create Python venv at $REPO/.venv and install requirements
|
||||
# 3. Install launcher at /usr/local/bin/pipekit (wraps the venv python)
|
||||
# 4. Ensure /etc/pipekit/secrets.env exists (mode 0600, placeholder body)
|
||||
# 5. Run `pipekit init` to create/upgrade the SQLite schema
|
||||
# 6. Register driver rows for every JDBC jar shipped with jrunner
|
||||
#
|
||||
# After running:
|
||||
# - Set DB passwords with: sudo pipekit secrets set <KEY>
|
||||
# - See systemd/pipekit.service for a unit file template
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REPO_DIR="${PIPEKIT_REPO:-$(cd "$(dirname "$0")" && pwd)}"
|
||||
VENV_DIR="$REPO_DIR/.venv"
|
||||
LAUNCHER="/usr/local/bin/pipekit"
|
||||
CONFIG_DIR="/etc/pipekit"
|
||||
SECRETS_FILE="$CONFIG_DIR/secrets.env"
|
||||
|
||||
if [ "$EUID" -ne 0 ]; then
|
||||
exec sudo -H -E "$0" "$@"
|
||||
fi
|
||||
|
||||
echo "== pipekit deploy =="
|
||||
echo "repo: $REPO_DIR"
|
||||
echo "venv: $VENV_DIR"
|
||||
echo "secrets: $SECRETS_FILE"
|
||||
echo ""
|
||||
|
||||
command -v python3 >/dev/null || { echo "ERROR: python3 not on PATH"; exit 1; }
|
||||
command -v jrunner >/dev/null || { echo "ERROR: jrunner not on PATH — install /opt/jrunner first"; exit 1; }
|
||||
|
||||
if [ ! -d "$VENV_DIR" ]; then
|
||||
echo "Creating venv at $VENV_DIR"
|
||||
python3 -m venv "$VENV_DIR"
|
||||
fi
|
||||
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
|
||||
"$VENV_DIR/bin/pip" install --quiet -r "$REPO_DIR/requirements.txt"
|
||||
echo "Python deps installed."
|
||||
|
||||
# The in-repo bin/pipekit auto-detects .venv at runtime; no rewrite needed.
|
||||
chmod +x "$REPO_DIR/bin/pipekit"
|
||||
ln -sf "$REPO_DIR/bin/pipekit" "$LAUNCHER"
|
||||
echo "Launcher: $LAUNCHER -> $REPO_DIR/bin/pipekit"
|
||||
|
||||
install -d -m 0755 "$CONFIG_DIR"
|
||||
if [ ! -f "$SECRETS_FILE" ]; then
|
||||
install -m 0600 /dev/null "$SECRETS_FILE"
|
||||
cat > "$SECRETS_FILE" <<'EOF'
|
||||
# pipekit secrets — sourced by the service process (EnvironmentFile=)
|
||||
# or by the shell before `pipekit serve`. One KEY=VALUE per line.
|
||||
# Connection rows reference these as $KEY (e.g. password: "$DB2PW").
|
||||
#
|
||||
# This file must stay mode 0600 and out of version control.
|
||||
# Use `sudo pipekit secrets set <KEY>` to add entries safely.
|
||||
EOF
|
||||
chmod 0600 "$SECRETS_FILE"
|
||||
echo "Created $SECRETS_FILE"
|
||||
else
|
||||
echo "Keeping existing $SECRETS_FILE"
|
||||
fi
|
||||
|
||||
"$LAUNCHER" init
|
||||
|
||||
# Register drivers for each JDBC jar jrunner ships with.
|
||||
JR_LIB="$(dirname "$(readlink -f "$(command -v jrunner)")")/../lib"
|
||||
register_jar() {
|
||||
local kind="$1" pattern="$2"
|
||||
local jar
|
||||
jar="$(find "$JR_LIB" -maxdepth 1 -name "$pattern" 2>/dev/null | head -1)"
|
||||
if [ -n "$jar" ]; then
|
||||
"$LAUNCHER" drivers register "$kind" --jar "$jar"
|
||||
else
|
||||
echo " (no $pattern in $JR_LIB — skipping $kind)"
|
||||
fi
|
||||
}
|
||||
register_jar db2 "jt400-*.jar"
|
||||
register_jar pg "postgresql-*.jar"
|
||||
register_jar mssql "mssql-jdbc-*.jar"
|
||||
|
||||
echo ""
|
||||
echo "pipekit deployed."
|
||||
echo ""
|
||||
echo "Next steps:"
|
||||
echo " 1. Set passwords: sudo pipekit secrets set DB2PW"
|
||||
echo " sudo pipekit secrets set PGPW"
|
||||
echo " 2. Start the server manually:"
|
||||
echo " set -a; source $SECRETS_FILE; set +a"
|
||||
echo " pipekit serve"
|
||||
echo " 3. Or install the systemd unit:"
|
||||
echo " sudo cp $REPO_DIR/systemd/pipekit.service /etc/systemd/system/"
|
||||
echo " sudo systemctl daemon-reload"
|
||||
echo " sudo systemctl enable --now pipekit"
|
||||
182
pipekit/cli.py
182
pipekit/cli.py
@ -45,48 +45,6 @@ def cmd_drivers_list(args) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
_DEFAULT_JDBC_CLASSES = {
|
||||
"db2": "com.ibm.as400.access.AS400JDBCDriver",
|
||||
"pg": "org.postgresql.Driver",
|
||||
"mssql": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
|
||||
}
|
||||
|
||||
|
||||
def cmd_drivers_register(args) -> int:
|
||||
try:
|
||||
drivers.get_driver(args.kind)
|
||||
except ValueError as e:
|
||||
print(f"error: {e}")
|
||||
return 1
|
||||
|
||||
existing = [d for d in repo.list_drivers() if d["kind"] == args.kind]
|
||||
if existing and not args.force:
|
||||
print(f"driver kind {args.kind!r} already registered as "
|
||||
f"{existing[0]['name']!r} (id={existing[0]['id']}). "
|
||||
f"Use --force to add a second row.")
|
||||
return 0
|
||||
|
||||
class_name = args.class_name or _DEFAULT_JDBC_CLASSES.get(args.kind)
|
||||
if not class_name:
|
||||
print(f"error: no built-in JDBC class for kind {args.kind!r}; "
|
||||
f"pass --class explicitly")
|
||||
return 1
|
||||
|
||||
import os
|
||||
if not os.path.exists(args.jar):
|
||||
print(f"warning: jar {args.jar!r} does not exist "
|
||||
f"(registering anyway)")
|
||||
|
||||
name = args.name or f"{args.kind}-jdbc"
|
||||
row = repo.create_driver(
|
||||
name=name, kind=args.kind, jar_file=args.jar,
|
||||
class_name=class_name, url_template=args.url_template,
|
||||
)
|
||||
print(f"registered driver id={row['id']} name={row['name']!r} "
|
||||
f"kind={row['kind']!r}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_drivers_show(args) -> int:
|
||||
try:
|
||||
d = drivers.get_driver(args.kind)
|
||||
@ -137,10 +95,8 @@ def cmd_serve(args) -> int:
|
||||
import uvicorn
|
||||
from .api import create_app
|
||||
|
||||
cfg = get_config()
|
||||
host = args.host or cfg.api_host
|
||||
port = args.port or cfg.api_port
|
||||
uvicorn.run(create_app(), host=host, port=port, reload=args.reload)
|
||||
port = args.port or get_config().api_port
|
||||
uvicorn.run(create_app(), host=args.host, port=port, reload=args.reload)
|
||||
return 0
|
||||
|
||||
|
||||
@ -157,101 +113,6 @@ def cmd_set_password(args) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
_DEFAULT_SECRETS_FILE = "/etc/pipekit/secrets.env"
|
||||
|
||||
|
||||
def _secrets_path(args) -> str:
|
||||
import os
|
||||
return (getattr(args, "file", None)
|
||||
or os.environ.get("PIPEKIT_SECRETS")
|
||||
or _DEFAULT_SECRETS_FILE)
|
||||
|
||||
|
||||
def cmd_secrets_list(args) -> int:
|
||||
import os
|
||||
path = _secrets_path(args)
|
||||
if not os.path.exists(path):
|
||||
print(f"{path}: no such file")
|
||||
return 0
|
||||
keys: list[str] = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#") or "=" not in s:
|
||||
continue
|
||||
keys.append(s.split("=", 1)[0])
|
||||
print(f"{path} — {len(keys)} secret(s):")
|
||||
for k in keys:
|
||||
print(f" {k}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_secrets_set(args) -> int:
|
||||
import getpass
|
||||
import os
|
||||
import stat
|
||||
path = _secrets_path(args)
|
||||
value = args.value if args.value is not None else getpass.getpass(
|
||||
f"value for {args.key}: ")
|
||||
if not value:
|
||||
print("error: empty value")
|
||||
return 1
|
||||
|
||||
lines: list[str] = []
|
||||
replaced = False
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if s and not s.startswith("#") and "=" in s \
|
||||
and s.split("=", 1)[0] == args.key:
|
||||
lines.append(f"{args.key}={value}\n")
|
||||
replaced = True
|
||||
else:
|
||||
lines.append(line)
|
||||
if not replaced:
|
||||
if lines and not lines[-1].endswith("\n"):
|
||||
lines.append("\n")
|
||||
lines.append(f"{args.key}={value}\n")
|
||||
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
tmp = path + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
f.writelines(lines)
|
||||
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR) # 0600
|
||||
os.replace(tmp, path)
|
||||
print(f"{'updated' if replaced else 'added'} {args.key} in {path}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_secrets_unset(args) -> int:
|
||||
import os
|
||||
path = _secrets_path(args)
|
||||
if not os.path.exists(path):
|
||||
print(f"{path}: no such file")
|
||||
return 1
|
||||
kept: list[str] = []
|
||||
removed = False
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if s and not s.startswith("#") and "=" in s \
|
||||
and s.split("=", 1)[0] == args.key:
|
||||
removed = True
|
||||
continue
|
||||
kept.append(line)
|
||||
if not removed:
|
||||
print(f"{args.key} not set in {path}")
|
||||
return 0
|
||||
tmp = path + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
f.writelines(kept)
|
||||
os.chmod(tmp, 0o600)
|
||||
os.replace(tmp, path)
|
||||
print(f"removed {args.key} from {path}")
|
||||
return 0
|
||||
|
||||
|
||||
def _report(checks) -> int:
|
||||
width = max(len(name) for name, _, _ in checks)
|
||||
failures = 0
|
||||
@ -289,21 +150,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
p_drv_show.add_argument("kind", help="one of the kinds from `pipekit drivers list`")
|
||||
p_drv_show.set_defaults(func=cmd_drivers_show)
|
||||
|
||||
p_drv_reg = drv_sub.add_parser(
|
||||
"register", help="add a driver row to the database")
|
||||
p_drv_reg.add_argument("kind", help="driver kind (db2, pg, mssql)")
|
||||
p_drv_reg.add_argument("--jar", required=True,
|
||||
help="absolute path to the JDBC jar")
|
||||
p_drv_reg.add_argument("--name",
|
||||
help="registry name (default: <kind>-jdbc)")
|
||||
p_drv_reg.add_argument("--class", dest="class_name",
|
||||
help="JDBC Driver class (default: built-in per kind)")
|
||||
p_drv_reg.add_argument("--url-template",
|
||||
help="optional JDBC URL template for the wizard")
|
||||
p_drv_reg.add_argument("--force", action="store_true",
|
||||
help="register even if a row for this kind exists")
|
||||
p_drv_reg.set_defaults(func=cmd_drivers_register)
|
||||
|
||||
p_run = sub.add_parser("run", help="run a module by name (synchronous)")
|
||||
p_run.add_argument("module", help="module name")
|
||||
p_run.add_argument("--dry-run", action="store_true",
|
||||
@ -311,8 +157,7 @@ def main(argv: list[str] | None = None) -> int:
|
||||
p_run.set_defaults(func=cmd_run)
|
||||
|
||||
p_serve = sub.add_parser("serve", help="start the HTTP API")
|
||||
p_serve.add_argument("--host", default=None,
|
||||
help="defaults to config.yaml api_host (127.0.0.1)")
|
||||
p_serve.add_argument("--host", default="127.0.0.1")
|
||||
p_serve.add_argument("--port", type=int, default=None,
|
||||
help="defaults to config.yaml api_port")
|
||||
p_serve.add_argument("--reload", action="store_true")
|
||||
@ -322,27 +167,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
p_pw.add_argument("username")
|
||||
p_pw.set_defaults(func=cmd_set_password)
|
||||
|
||||
p_sec = sub.add_parser(
|
||||
"secrets",
|
||||
help=f"manage the env-var file for DB passwords (default {_DEFAULT_SECRETS_FILE})")
|
||||
sec_sub = p_sec.add_subparsers(dest="sec_cmd", required=True)
|
||||
|
||||
p_sec_list = sec_sub.add_parser("list", help="list keys in the secrets file")
|
||||
p_sec_list.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
|
||||
p_sec_list.set_defaults(func=cmd_secrets_list)
|
||||
|
||||
p_sec_set = sec_sub.add_parser("set", help="add or update a KEY (prompted if value omitted)")
|
||||
p_sec_set.add_argument("key")
|
||||
p_sec_set.add_argument("value", nargs="?", default=None,
|
||||
help="value; omit to be prompted (safer)")
|
||||
p_sec_set.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
|
||||
p_sec_set.set_defaults(func=cmd_secrets_set)
|
||||
|
||||
p_sec_unset = sec_sub.add_parser("unset", help="remove a KEY from the secrets file")
|
||||
p_sec_unset.add_argument("key")
|
||||
p_sec_unset.add_argument("--file", help=f"override path (default {_DEFAULT_SECRETS_FILE})")
|
||||
p_sec_unset.set_defaults(func=cmd_secrets_unset)
|
||||
|
||||
args = p.parse_args(argv)
|
||||
return args.func(args)
|
||||
|
||||
|
||||
@ -28,10 +28,6 @@ class Config:
|
||||
def api_port(self) -> int:
|
||||
return int(self._data.get("api_port", 8100))
|
||||
|
||||
@property
|
||||
def api_host(self) -> str:
|
||||
return str(self._data.get("api_host", "127.0.0.1"))
|
||||
|
||||
def get(self, key: str, default=None):
|
||||
return self._data.get(key, default)
|
||||
|
||||
|
||||
@ -91,19 +91,16 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
|
||||
status = "success"
|
||||
return RunOutcome(run_id, status, None, None, resolved_sql, merge_sql)
|
||||
|
||||
# 4. (re)create staging from dest. DROP+CREATE (not IF NOT EXISTS) so
|
||||
# any drift — dest columns added since staging was last made — is
|
||||
# self-healing. Staging is ephemeral per SPEC; nothing of value lives
|
||||
# between runs.
|
||||
# 4. ensure staging table exists on dest. Mirror the real dest schema
|
||||
# so jrunner's auto-DELETE and the subsequent merge INSERT both find
|
||||
# a table to work on. Idempotent — no-op after first run.
|
||||
staging_schema, _, _ = module["staging_table"].partition(".")
|
||||
if staging_schema and staging_schema != module["staging_table"]:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"CREATE SCHEMA IF NOT EXISTS {staging_schema};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE TABLE {module['staging_table']} "
|
||||
f"CREATE TABLE IF NOT EXISTS {module['staging_table']} "
|
||||
f"(LIKE {module['dest_table']} INCLUDING ALL);",
|
||||
)
|
||||
|
||||
@ -131,9 +128,6 @@ def run_module(module_id: int, *, group_run_id: int | None = None,
|
||||
|
||||
except Exception as e: # noqa: BLE001
|
||||
error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
|
||||
if isinstance(e, jrunner.JrunnerError):
|
||||
repo.log_run_output(run_id, jrunner_stdout=e.stdout,
|
||||
jrunner_stderr=e.stderr)
|
||||
# Failure-path hooks, if any. Never let these mask the real error.
|
||||
try:
|
||||
hook_log = _run_hooks(module_id, fail_fast=False,
|
||||
|
||||
@ -125,9 +125,6 @@ def query(
|
||||
if r.returncode != 0:
|
||||
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
||||
stdout=r.stdout, stderr=r.stderr)
|
||||
silent = _detect_silent_failure(r.stdout, r.stderr)
|
||||
if silent:
|
||||
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
|
||||
|
||||
reader = csv.reader(io.StringIO(r.stdout))
|
||||
header = next(reader, [])
|
||||
@ -172,9 +169,6 @@ def migrate(
|
||||
if r.returncode != 0:
|
||||
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
|
||||
stdout=r.stdout, stderr=r.stderr)
|
||||
silent = _detect_silent_failure(r.stdout, r.stderr)
|
||||
if silent:
|
||||
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
|
||||
|
||||
return MigrateResult(
|
||||
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
|
||||
@ -208,35 +202,6 @@ def _parse_row_count(text: str) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
# jrunner catches SQLException, prints the stack trace, then exits 0 at
|
||||
# nearly every failure site (see jrunner.java). Detect those by scanning
|
||||
# for a Java stack-trace signature so callers don't treat silent failures
|
||||
# as success.
|
||||
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
|
||||
_EXCEPTION_HEADER_RE = re.compile(
|
||||
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
|
||||
|
||||
# jrunner runs query-mode SQL with `executeQuery`, which requires the
|
||||
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
|
||||
# still executes, but PG then throws "No results were returned by the
|
||||
# query." The statement succeeded — ignore the trace.
|
||||
_BENIGN_EXCEPTION_SUBSTRINGS = (
|
||||
"No results were returned by the query",
|
||||
)
|
||||
|
||||
|
||||
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
|
||||
"""Return a short error summary if jrunner exited 0 but logged a failure."""
|
||||
combined = (stderr or "") + "\n" + (stdout or "")
|
||||
if not _STACK_FRAME_RE.search(combined):
|
||||
return None
|
||||
m = _EXCEPTION_HEADER_RE.search(combined)
|
||||
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
|
||||
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
|
||||
return None
|
||||
return header
|
||||
|
||||
|
||||
class JrunnerError(RuntimeError):
|
||||
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
|
||||
super().__init__(message)
|
||||
|
||||
@ -177,40 +177,6 @@ def list_modules() -> list[dict]:
|
||||
return [dict(r) for r in c.execute("SELECT * FROM module ORDER BY name")]
|
||||
|
||||
|
||||
def update_module(module_id: int, *, name: str | None = None,
|
||||
source_connection_id: int | None = None,
|
||||
dest_connection_id: int | None = None,
|
||||
dest_table: str | None = None,
|
||||
staging_table: str | None = None,
|
||||
source_query: str | None = None,
|
||||
merge_strategy: str | None = None,
|
||||
merge_key: str | None = None,
|
||||
dest_description: str | None = None,
|
||||
enabled: int | None = None) -> dict | None:
|
||||
fields: list[str] = []
|
||||
values: list = []
|
||||
for col, val in (("name", name),
|
||||
("source_connection_id", source_connection_id),
|
||||
("dest_connection_id", dest_connection_id),
|
||||
("dest_table", dest_table),
|
||||
("staging_table", staging_table),
|
||||
("source_query", source_query),
|
||||
("merge_strategy", merge_strategy),
|
||||
("merge_key", merge_key),
|
||||
("dest_description", dest_description),
|
||||
("enabled", enabled)):
|
||||
if val is not None:
|
||||
fields.append(f"{col}=?")
|
||||
values.append(val)
|
||||
if not fields:
|
||||
return get_module(module_id)
|
||||
fields.append("updated_at=datetime('now')")
|
||||
values.append(module_id)
|
||||
with db.connect() as c:
|
||||
c.execute(f"UPDATE module SET {', '.join(fields)} WHERE id=?", values)
|
||||
return get_module(module_id)
|
||||
|
||||
|
||||
class ModuleRunning(RuntimeError):
|
||||
"""Raised by delete_module when the module is currently running."""
|
||||
|
||||
|
||||
@ -13,7 +13,6 @@ wizard, editors, and SSE-driven live run watch come next.
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from fastapi import APIRouter, FastAPI, HTTPException, Query, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
@ -132,72 +131,6 @@ def module_detail(request: Request, module_id: int):
|
||||
)
|
||||
|
||||
|
||||
@_router.get("/modules/{module_id}/edit", response_class=HTMLResponse)
|
||||
def module_edit(request: Request, module_id: int):
|
||||
module = repo.get_module(module_id)
|
||||
if module is None:
|
||||
raise HTTPException(404, f"module id={module_id} not found")
|
||||
return _templates.TemplateResponse(
|
||||
request,
|
||||
"module_form.html",
|
||||
_ctx(module=module, connections=repo.list_connections(),
|
||||
form_action=f"/modules/{module_id}",
|
||||
cancel_url=f"/modules/{module_id}"),
|
||||
)
|
||||
|
||||
|
||||
@_router.post("/modules/{module_id}")
|
||||
async def module_update(request: Request, module_id: int):
|
||||
module = repo.get_module(module_id)
|
||||
if module is None:
|
||||
raise HTTPException(404, f"module id={module_id} not found")
|
||||
form = await request.form()
|
||||
new_name = form["name"].strip()
|
||||
if new_name != module["name"]:
|
||||
existing = repo.get_module_by_name(new_name)
|
||||
if existing is not None:
|
||||
raise HTTPException(
|
||||
409, f"module name {new_name!r} already exists — pick another")
|
||||
|
||||
merge_strategy = form.get("merge_strategy", "full")
|
||||
if merge_strategy not in ("full", "incremental", "append"):
|
||||
raise HTTPException(400, f"invalid merge_strategy: {merge_strategy!r}")
|
||||
|
||||
new_staging = form["staging_table"].strip()
|
||||
new_description = (form.get("dest_description") or "").strip() or None
|
||||
|
||||
# DDL snap: drop the staging table under its old name (runner recreates
|
||||
# fresh from dest on next run). Re-apply dest COMMENT if it changed.
|
||||
dest_conn = repo.get_connection(module["dest_connection_id"])
|
||||
if dest_conn is not None:
|
||||
try:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {module['staging_table']};")
|
||||
if new_description != module["dest_description"]:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"COMMENT ON TABLE {module['dest_table']} IS "
|
||||
f"{_sql_str(new_description or '')};",
|
||||
)
|
||||
except jrunner.JrunnerError as e:
|
||||
raise HTTPException(500, f"dest DDL snap failed: {e}")
|
||||
|
||||
repo.update_module(
|
||||
module_id,
|
||||
name=new_name,
|
||||
source_connection_id=int(form["source_connection_id"]),
|
||||
dest_connection_id=int(form["dest_connection_id"]),
|
||||
dest_table=form["dest_table"].strip(),
|
||||
staging_table=new_staging,
|
||||
source_query=form["source_query"],
|
||||
merge_strategy=merge_strategy,
|
||||
merge_key=(form.get("merge_key") or "").strip() or None,
|
||||
dest_description=new_description,
|
||||
enabled=1 if form.get("enabled") == "1" else 0,
|
||||
)
|
||||
return RedirectResponse(url=f"/modules/{module_id}", status_code=303)
|
||||
|
||||
|
||||
@_router.post("/modules/{module_id}/delete")
|
||||
def module_delete(module_id: int):
|
||||
if repo.get_module(module_id) is None:
|
||||
@ -341,23 +274,6 @@ def wizard_step3(request: Request,
|
||||
default_dest_conn_id = conn.get("default_dest_connection_id")
|
||||
default_dest_schema = conn.get("default_dest_schema") or ""
|
||||
|
||||
# Proactive warning — if the default dest table already exists, surface
|
||||
# its columns now so the user can align picks before submit.
|
||||
dest_warn: dict | None = None
|
||||
if not fetch_error and default_dest_conn_id and default_dest_schema:
|
||||
dest_conn_row = repo.get_connection(default_dest_conn_id)
|
||||
if dest_conn_row is not None:
|
||||
try:
|
||||
existing = _existing_dest_columns(
|
||||
dest_conn_row, default_dest_schema, default_module_name)
|
||||
except jrunner.JrunnerError:
|
||||
existing = None
|
||||
if existing is not None:
|
||||
dest_warn = {
|
||||
"qualified": f"{default_dest_schema}.{default_module_name}",
|
||||
"columns": sorted(existing),
|
||||
}
|
||||
|
||||
return _templates.TemplateResponse(
|
||||
request,
|
||||
"wizard_step3.html",
|
||||
@ -367,8 +283,7 @@ def wizard_step3(request: Request,
|
||||
table_description=table_description,
|
||||
fetch_error=fetch_error, default_module_name=default_module_name,
|
||||
default_dest_conn_id=default_dest_conn_id,
|
||||
default_dest_schema=default_dest_schema,
|
||||
dest_warn=dest_warn),
|
||||
default_dest_schema=default_dest_schema),
|
||||
)
|
||||
|
||||
|
||||
@ -452,63 +367,16 @@ async def wizard_create(request: Request):
|
||||
create_table_sql = dest_drv.build_create_table_sql(qualified_dest, chosen)
|
||||
except NotImplementedError as e:
|
||||
raise HTTPException(400, str(e))
|
||||
effective_staging = staging_table or f"pipekit_staging.{module_name}"
|
||||
staging_schema, _, _ = effective_staging.partition(".")
|
||||
|
||||
# If the dest table already exists, don't clobber it. Verify the picks
|
||||
# match its shape and skip CREATE/COMMENT.
|
||||
try:
|
||||
existing_cols = _existing_dest_columns(
|
||||
dest_conn, dest_schema, dest_table_bare)
|
||||
except jrunner.JrunnerError as e:
|
||||
raise HTTPException(500, f"could not introspect dest: {e}")
|
||||
dest_exists = existing_cols is not None
|
||||
if dest_exists:
|
||||
missing = [c["dest_name"] for c in chosen
|
||||
if c["dest_name"].lower() not in existing_cols]
|
||||
if missing:
|
||||
back_qs = urlencode(
|
||||
[("source_connection_id", source_connection_id),
|
||||
("table", table),
|
||||
("table_schema", qvals.get("schema") or qvals.get("library") or ""),
|
||||
*qvals.items()])
|
||||
return _templates.TemplateResponse(
|
||||
request,
|
||||
"wizard_error.html",
|
||||
_ctx(
|
||||
title="Dest table column mismatch",
|
||||
qualified_dest=qualified_dest,
|
||||
missing=missing,
|
||||
existing=sorted(existing_cols),
|
||||
back_qs=back_qs,
|
||||
),
|
||||
status_code=409,
|
||||
)
|
||||
|
||||
try:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(dest_schema)};",
|
||||
)
|
||||
if not dest_exists:
|
||||
jrunner.run_dest_sql(dest_conn, create_table_sql)
|
||||
comment_sql = _build_comment_sql(dest_drv, qualified_dest,
|
||||
dest_description, chosen)
|
||||
if comment_sql:
|
||||
jrunner.run_dest_sql(dest_conn, comment_sql)
|
||||
# Pre-align staging to dest so first run doesn't surprise us.
|
||||
if staging_schema and staging_schema != effective_staging:
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE SCHEMA IF NOT EXISTS {dest_drv.quote_identifier(staging_schema)};",
|
||||
)
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn, f"DROP TABLE IF EXISTS {effective_staging};")
|
||||
jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"CREATE TABLE {effective_staging} "
|
||||
f"(LIKE {qualified_dest} INCLUDING ALL);",
|
||||
)
|
||||
jrunner.run_dest_sql(dest_conn, create_table_sql)
|
||||
comment_sql = _build_comment_sql(dest_drv, qualified_dest,
|
||||
dest_description, chosen)
|
||||
if comment_sql:
|
||||
jrunner.run_dest_sql(dest_conn, comment_sql)
|
||||
except jrunner.JrunnerError as e:
|
||||
raise HTTPException(500, f"dest provisioning failed: {e}")
|
||||
|
||||
@ -532,21 +400,6 @@ def _sql_str(v: str) -> str:
|
||||
return "'" + v.replace("'", "''") + "'"
|
||||
|
||||
|
||||
def _existing_dest_columns(dest_conn: dict, schema: str,
|
||||
table: str) -> set[str] | None:
|
||||
"""Return lowercase column names of an existing PG dest table, or
|
||||
None if it doesn't exist. PG-only; fine while pg is the sole dest."""
|
||||
r = jrunner.run_dest_sql(
|
||||
dest_conn,
|
||||
f"SELECT column_name FROM information_schema.columns "
|
||||
f"WHERE table_schema={_sql_str(schema)} "
|
||||
f"AND table_name={_sql_str(table)}",
|
||||
)
|
||||
if not r.rows:
|
||||
return None
|
||||
return {row[0].strip().lower() for row in r.rows if row and row[0]}
|
||||
|
||||
|
||||
def _build_comment_sql(dest_drv, qualified_dest: str,
|
||||
table_description: str | None,
|
||||
columns: list[dict]) -> str:
|
||||
|
||||
@ -277,4 +277,3 @@ table.picker tbody tr:hover td { background: #1c2128; }
|
||||
}
|
||||
.flash.ok { border-color: #2f6b35; background: #16261a; color: #b6dcb8; }
|
||||
.flash.err { border-color: #6b2f2f; background: #261616; color: #dcb6b6; }
|
||||
.flash.warn { border-color: #6b5a2f; background: #261f16; color: #e1c98a; }
|
||||
|
||||
@ -19,7 +19,6 @@
|
||||
<input type="hidden" name="dry_run" value="1">
|
||||
<button type="submit">Dry run</button>
|
||||
</form>
|
||||
<a class="btn" href="/modules/{{ module.id }}/edit">Edit</a>
|
||||
<form class="inline" method="post" action="/modules/{{ module.id }}/delete"
|
||||
onsubmit="return confirm('Delete module {{ module.name }}? This removes the module and its run history. The dest table is NOT dropped.')">
|
||||
<button type="submit" class="ghost" style="color:var(--danger)">Delete</button>
|
||||
@ -42,8 +41,7 @@
|
||||
<div>
|
||||
<div class="panel">
|
||||
<header>Source query
|
||||
<span class="subtitle">free text with <code>{watermark}</code> placeholders</span>
|
||||
<span style="margin-left:auto"><a href="/modules/{{ module.id }}/edit">edit</a></span>
|
||||
<span class="subtitle">free text — edit opens in $EDITOR (TODO)</span>
|
||||
</header>
|
||||
<div class="body"><pre class="sql">{{ module.source_query }}</pre></div>
|
||||
</div>
|
||||
|
||||
@ -1,115 +0,0 @@
|
||||
{% extends "base.html" %}
|
||||
{% set section = "modules" %}
|
||||
{% block title %}Edit module · {{ module.name }} — Pipekit{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="panel">
|
||||
<header>
|
||||
Edit module · {{ module.name }}
|
||||
<span class="subtitle">
|
||||
module #{{ module.id }}
|
||||
{% if module.running %}<span class="pill running">running</span>{% endif %}
|
||||
</span>
|
||||
<span style="margin-left:auto"><a href="{{ cancel_url }}">← back to module</a></span>
|
||||
</header>
|
||||
<div class="body">
|
||||
{% if module.running %}
|
||||
<div class="flash warning" style="margin-bottom:0.8rem">
|
||||
This module is currently running. Saving changes now is allowed but may affect the in-flight run.
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<form method="post" action="{{ form_action }}">
|
||||
<label class="field">
|
||||
<span>name</span>
|
||||
<input type="text" name="name" required value="{{ module.name }}">
|
||||
<span class="help">must be unique; also used as the default staging table suffix</span>
|
||||
</label>
|
||||
|
||||
<div class="two-col" style="gap:1rem">
|
||||
<label class="field">
|
||||
<span>source connection</span>
|
||||
<select name="source_connection_id" required>
|
||||
{% for c in connections %}
|
||||
<option value="{{ c.id }}"
|
||||
{% if c.id == module.source_connection_id %}selected{% endif %}>
|
||||
{{ c.name }}
|
||||
</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
<span class="help">where <code>source_query</code> runs</span>
|
||||
</label>
|
||||
|
||||
<label class="field">
|
||||
<span>dest connection</span>
|
||||
<select name="dest_connection_id" required>
|
||||
{% for c in connections %}
|
||||
<option value="{{ c.id }}"
|
||||
{% if c.id == module.dest_connection_id %}selected{% endif %}>
|
||||
{{ c.name }}
|
||||
</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
<span class="help">where staging + merge run</span>
|
||||
</label>
|
||||
</div>
|
||||
|
||||
<div class="two-col" style="gap:1rem">
|
||||
<label class="field">
|
||||
<span>dest table</span>
|
||||
<input type="text" name="dest_table" required value="{{ module.dest_table }}">
|
||||
<span class="help"><code>schema.table</code> — editing here does NOT rename the actual table</span>
|
||||
</label>
|
||||
|
||||
<label class="field">
|
||||
<span>staging table</span>
|
||||
<input type="text" name="staging_table" required value="{{ module.staging_table }}">
|
||||
<span class="help">dropped on save; recreated from dest on next run</span>
|
||||
</label>
|
||||
</div>
|
||||
|
||||
<label class="field">
|
||||
<span>source query</span>
|
||||
<textarea name="source_query" rows="14" required class="mono">{{ module.source_query }}</textarea>
|
||||
<span class="help">free text; <code>{name}</code> placeholders resolved from watermarks at run time</span>
|
||||
</label>
|
||||
|
||||
<div class="two-col" style="gap:1rem">
|
||||
<label class="field">
|
||||
<span>merge strategy</span>
|
||||
<select name="merge_strategy" required>
|
||||
{% for s in ("full", "incremental", "append") %}
|
||||
<option value="{{ s }}" {% if s == module.merge_strategy %}selected{% endif %}>{{ s }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</label>
|
||||
|
||||
<label class="field">
|
||||
<span>merge key</span>
|
||||
<input type="text" name="merge_key" value="{{ module.merge_key or '' }}"
|
||||
placeholder="id or (col_a, col_b)">
|
||||
<span class="help">required for <code>incremental</code>; ignored otherwise</span>
|
||||
</label>
|
||||
</div>
|
||||
|
||||
<label class="field">
|
||||
<span>dest description</span>
|
||||
<textarea name="dest_description" rows="2">{{ module.dest_description or '' }}</textarea>
|
||||
<span class="help">COMMENT ON TABLE value; re-applied on save if changed</span>
|
||||
</label>
|
||||
|
||||
<label class="field" style="flex-direction:row;align-items:center;gap:0.5rem">
|
||||
<input type="checkbox" name="enabled" value="1"
|
||||
{% if module.enabled %}checked{% endif %}>
|
||||
<span>enabled</span>
|
||||
<span class="help">disabled modules are skipped by group runs; ad-hoc runs still work</span>
|
||||
</label>
|
||||
|
||||
<div class="actions" style="justify-content:flex-end;margin-top:0.8rem">
|
||||
<a class="btn ghost" href="{{ cancel_url }}">cancel</a>
|
||||
<button type="submit" class="primary">save changes</button>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
@ -1,57 +0,0 @@
|
||||
{% extends "base.html" %}
|
||||
{% set section = "modules" %}
|
||||
{% block title %}New module — error{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="panel">
|
||||
<header>
|
||||
Wizard error
|
||||
<span class="subtitle">{{ title }}</span>
|
||||
<span style="margin-left:auto">
|
||||
<a href="/wizard/columns?{{ back_qs }}">← back to step 3</a>
|
||||
</span>
|
||||
</header>
|
||||
<div class="body">
|
||||
<div class="flash err">
|
||||
Dest table <code>{{ qualified_dest }}</code> already exists, but your
|
||||
picks don't match its schema. Drop the table, choose a different
|
||||
dest table, or align your column picks (dest names) with the
|
||||
existing columns below.
|
||||
</div>
|
||||
|
||||
<div class="two-col">
|
||||
<div class="panel">
|
||||
<header>
|
||||
Missing from existing table
|
||||
<span class="subtitle">{{ missing|length }}</span>
|
||||
</header>
|
||||
<div class="body tight">
|
||||
<table class="grid">
|
||||
<tbody>
|
||||
{% for m in missing %}
|
||||
<tr><td class="mono">{{ m }}</td></tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="panel">
|
||||
<header>
|
||||
Columns in existing dest
|
||||
<span class="subtitle">{{ existing|length }}</span>
|
||||
</header>
|
||||
<div class="body tight">
|
||||
<table class="grid">
|
||||
<tbody>
|
||||
{% for c in existing %}
|
||||
<tr><td class="mono">{{ c }}</td></tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
@ -27,45 +27,10 @@
|
||||
</label>
|
||||
{% endfor %}
|
||||
|
||||
<label class="field">
|
||||
<span>table (skip browse)</span>
|
||||
<input type="text" name="table" id="jump-table"
|
||||
placeholder="schema.table — or leave blank to browse"
|
||||
autocomplete="off" spellcheck="false">
|
||||
<span class="help">if you know the table, fill this in and click jump. <code>schema.table</code> auto-populates the qualifier above on tab-out.</span>
|
||||
</label>
|
||||
|
||||
<div class="actions" style="margin-top:0.8rem">
|
||||
<button type="submit" class="primary">browse →</button>
|
||||
<button type="submit" id="jump-btn"
|
||||
formaction="/wizard/columns" disabled>
|
||||
jump to columns →
|
||||
</button>
|
||||
</div>
|
||||
</form>
|
||||
<script>
|
||||
(function () {
|
||||
var tbl = document.getElementById('jump-table');
|
||||
var btn = document.getElementById('jump-btn');
|
||||
var schemaInput = document.querySelector('input[name="schema"]')
|
||||
|| document.querySelector('input[name="library"]');
|
||||
|
||||
function updateBtn() {
|
||||
btn.disabled = !tbl.value.trim();
|
||||
}
|
||||
tbl.addEventListener('input', updateBtn);
|
||||
tbl.addEventListener('blur', function () {
|
||||
var v = tbl.value.trim();
|
||||
var dot = v.indexOf('.');
|
||||
if (dot > 0 && schemaInput) {
|
||||
schemaInput.value = v.substring(0, dot);
|
||||
tbl.value = v.substring(dot + 1);
|
||||
updateBtn();
|
||||
}
|
||||
});
|
||||
updateBtn();
|
||||
})();
|
||||
</script>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
@ -18,16 +18,6 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{% if dest_warn %}
|
||||
<div class="flash warn">
|
||||
Dest table <code>{{ dest_warn.qualified }}</code> already exists on the
|
||||
default destination. If you proceed, pipekit will <strong>not</strong> drop
|
||||
or recreate it — your picks below (dest names) must match the existing
|
||||
columns, or the create will fail. Existing columns:
|
||||
<span class="mono">{{ dest_warn.columns|join(', ') }}</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if not fetch_error %}
|
||||
<form method="post" action="/wizard/create">
|
||||
<input type="hidden" name="source_connection_id" value="{{ connection.id }}">
|
||||
|
||||
@ -1,29 +0,0 @@
|
||||
# Pipekit systemd unit template.
|
||||
#
|
||||
# Install:
|
||||
# sudo cp pipekit.service /etc/systemd/system/
|
||||
# sudo systemctl daemon-reload
|
||||
# sudo systemctl enable --now pipekit
|
||||
#
|
||||
# Runs as root by default. For a dedicated service account, create the
|
||||
# user and uncomment User=/Group= below:
|
||||
# sudo useradd --system --home-dir /opt/pipekit --shell /usr/sbin/nologin pipekit
|
||||
# sudo chown -R pipekit:pipekit /opt/pipekit/pipekit.db /etc/pipekit
|
||||
|
||||
[Unit]
|
||||
Description=Pipekit sync engine
|
||||
After=network-online.target
|
||||
Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
# User=pipekit
|
||||
# Group=pipekit
|
||||
WorkingDirectory=/opt/pipekit
|
||||
EnvironmentFile=/etc/pipekit/secrets.env
|
||||
ExecStart=/usr/local/bin/pipekit serve --host 0.0.0.0
|
||||
Restart=on-failure
|
||||
RestartSec=5s
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
Loading…
Reference in New Issue
Block a user