pipekit/pipekit/jrunner.py
Paul Trowbridge 01bcba78b4 Snap staging DDL on module create/edit/run; allowlist benign jrunner exception.
Staging table drift caused silent data loss when dest grew columns but
staging kept the old shape. Fix on three fronts:

- Runner now DROP+CREATEs staging each run instead of CREATE IF NOT
  EXISTS, so any drift self-heals.
- Wizard create drop+creates staging right after dest is provisioned,
  surfacing DDL errors at create time.
- Module edit drops the (old-name) staging table and re-applies
  COMMENT ON TABLE when dest_description changed.

jrunner's query mode uses executeQuery() which raises
"No results were returned by the query" after DDL/DML succeeds; the
stack-trace detector now allowlists that exception so normal
CREATE/TRUNCATE/INSERT runs aren't flagged as failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 20:10:36 -04:00

245 lines
7.6 KiB
Python

"""Thin wrapper around the `jrunner` Java CLI.
Pipekit uses jrunner for two things:
* **migration mode** — bulk streaming from source to dest (handled by the
engine; not in this file yet).
* **query mode** — single-result queries for watermark resolvers and for
wizard introspection. Implemented here via :func:`query`.
Passwords are stored as env-var references (e.g. `"$DB2PW"`) per spec;
:func:`resolve_password` expands them at call time so secrets never land on
argv or in the database.
"""
from __future__ import annotations
import csv
import io
import os
import re
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from .config import get_config
@dataclass
class QueryResult:
columns: list[str]
rows: list[list[str]]
stdout: str
stderr: str
def first_value(self) -> str | None:
if not self.rows or not self.rows[0]:
return None
return self.rows[0][0]
@dataclass
class MigrateResult:
row_count: int | None
stdout: str
stderr: str
def resolve_password(raw: str | None) -> str:
if not raw:
return ""
if raw.startswith("$"):
return os.environ.get(raw[1:], "")
return raw
# Force the JVM (and jt400 specifically) into non-interactive mode. Without
# this, jt400 pops up an AWT signon dialog when the password is empty/wrong
# — which crashes with HeadlessException on a server.
_HEADLESS_JAVA_OPTS = (
"-Djava.awt.headless=true "
"-Dcom.ibm.as400.access.AS400.guiAvailable=false"
)
def _subprocess_env() -> dict:
env = dict(os.environ)
existing = env.get("JAVA_TOOL_OPTIONS", "").strip()
env["JAVA_TOOL_OPTIONS"] = (
f"{existing} {_HEADLESS_JAVA_OPTS}".strip() if existing else _HEADLESS_JAVA_OPTS
)
return env
def jrunner_path() -> Path:
return get_config().jrunner_path
def version() -> tuple[bool, str]:
"""Return (ok, message) for use by pipekit doctor."""
path = jrunner_path()
if not shutil.which(str(path)) and not path.exists():
return False, f"jrunner not found at {path} (see /opt/jrunner/deploy.sh)"
try:
r = subprocess.run([str(path), "--help"], capture_output=True,
text=True, timeout=10)
first = (r.stdout or r.stderr).splitlines()[0] if (r.stdout or r.stderr) else ""
if "jrunner" in first.lower():
return True, first.strip()
return True, f"found at {path}"
except Exception as e:
return False, f"{type(e).__name__}: {e}"
def query(
jdbc_url: str,
username: str | None,
password: str | None,
sql: str,
*,
timeout: int = 60,
trim: bool = True,
) -> QueryResult:
"""Run `sql` in jrunner query mode and parse CSV output."""
path = jrunner_path()
pw = resolve_password(password)
with tempfile.NamedTemporaryFile("w", suffix=".sql", delete=False) as f:
f.write(sql)
sql_path = f.name
try:
argv = [str(path),
"-scu", jdbc_url,
"-scn", username or "",
"-scp", pw,
"-sq", sql_path,
"-f", "csv"]
if trim:
argv.insert(1, "-t")
r = subprocess.run(argv, capture_output=True, text=True,
timeout=timeout, env=_subprocess_env())
finally:
os.unlink(sql_path)
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
reader = csv.reader(io.StringIO(r.stdout))
header = next(reader, [])
rows = [row for row in reader if row]
return QueryResult(columns=header, rows=rows, stdout=r.stdout, stderr=r.stderr)
def migrate(
source_conn: dict,
dest_conn: dict,
sql: str,
dest_table: str,
*,
clear: bool = False,
trim: bool = True,
timeout: int = 3600,
) -> MigrateResult:
"""Stream `sql` results from source into `dest_table` via jrunner migration mode."""
path = jrunner_path()
with tempfile.NamedTemporaryFile("w", suffix=".sql", delete=False) as f:
f.write(sql)
sql_path = f.name
try:
argv = [str(path),
"-scu", source_conn["jdbc_url"],
"-scn", source_conn.get("username") or "",
"-scp", resolve_password(source_conn.get("password")),
"-dcu", dest_conn["jdbc_url"],
"-dcn", dest_conn.get("username") or "",
"-dcp", resolve_password(dest_conn.get("password")),
"-sq", sql_path,
"-dt", dest_table]
if trim:
argv.append("-t")
if clear:
argv.append("-c")
r = subprocess.run(argv, capture_output=True, text=True,
timeout=timeout, env=_subprocess_env())
finally:
os.unlink(sql_path)
if r.returncode != 0:
raise JrunnerError(r.stderr.strip() or r.stdout.strip(),
stdout=r.stdout, stderr=r.stderr)
silent = _detect_silent_failure(r.stdout, r.stderr)
if silent:
raise JrunnerError(silent, stdout=r.stdout, stderr=r.stderr)
return MigrateResult(
row_count=_parse_row_count(r.stdout + "\n" + r.stderr),
stdout=r.stdout, stderr=r.stderr,
)
def run_dest_sql(conn: dict, sql: str, *, timeout: int = 600) -> QueryResult:
"""Execute arbitrary SQL (DDL/DML/SELECT) on a connection. Used for
merge SQL, TRUNCATE staging, hooks, etc. Internally this is just
jrunner query mode pointed at the target."""
return query(conn["jdbc_url"], conn.get("username"), conn.get("password"),
sql, timeout=timeout, trim=False)
_ROW_COUNT_PATTERNS = (
re.compile(r"(\d+)\s+rows?\s+(?:inserted|transferred|migrated|written)", re.I),
re.compile(r"inserted\s+(\d+)\s+rows?", re.I),
re.compile(r"rows?:\s*(\d+)", re.I),
)
def _parse_row_count(text: str) -> int | None:
for pat in _ROW_COUNT_PATTERNS:
m = pat.search(text)
if m:
try:
return int(m.group(1))
except ValueError:
pass
return None
# jrunner catches SQLException, prints the stack trace, then exits 0 at
# nearly every failure site (see jrunner.java). Detect those by scanning
# for a Java stack-trace signature so callers don't treat silent failures
# as success.
_STACK_FRAME_RE = re.compile(r"^\s*at [\w.$<>]+\([^)\n]*\.java:\d+\)", re.M)
_EXCEPTION_HEADER_RE = re.compile(
r"^(?:[\w.$]+\.)*[\w$]+(?:Exception|Error)(?::[^\n]*)?$", re.M)
# jrunner runs query-mode SQL with `executeQuery`, which requires the
# statement to produce a ResultSet. DDL/DML (CREATE, TRUNCATE, INSERT)
# still executes, but PG then throws "No results were returned by the
# query." The statement succeeded — ignore the trace.
_BENIGN_EXCEPTION_SUBSTRINGS = (
"No results were returned by the query",
)
def _detect_silent_failure(stdout: str, stderr: str) -> str | None:
"""Return a short error summary if jrunner exited 0 but logged a failure."""
combined = (stderr or "") + "\n" + (stdout or "")
if not _STACK_FRAME_RE.search(combined):
return None
m = _EXCEPTION_HEADER_RE.search(combined)
header = m.group(0).strip() if m else "jrunner logged a Java stack trace but exited 0"
if any(s in header for s in _BENIGN_EXCEPTION_SUBSTRINGS):
return None
return header
class JrunnerError(RuntimeError):
def __init__(self, message: str, *, stdout: str = "", stderr: str = ""):
super().__init__(message)
self.stdout = stdout
self.stderr = stderr