From ea90b3f56a1e21c4a5464a1176d59126695a3f5d Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Wed, 17 Jun 2026 15:20:30 -0400 Subject: [PATCH] api: log request payload (secrets masked) on raised HTTPExceptions Add an ASGI middleware that buffers each request body onto the scope and replays it downstream, so the HTTPException handler can log the submitted payload alongside the error. Fields whose name looks secret (password/pwd/ secret/token) are masked. Makes failures like the wizard 500 debuggable against the actual call content. Covers raised HTTPExceptions; handlers that *return* an error response are not body-logged yet. Co-Authored-By: Claude Opus 4.8 --- pipekit/api/app.py | 80 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/pipekit/api/app.py b/pipekit/api/app.py index 8ea5de6..e119b47 100644 --- a/pipekit/api/app.py +++ b/pipekit/api/app.py @@ -38,6 +38,70 @@ def _configure_logging() -> None: base.propagate = False +_SECRET_KEYS = ("password", "passwd", "pwd", "secret", "token") + + +def _redact_body(raw: bytes, content_type: str, limit: int = 2000) -> str: + """Decode a captured request body for logging, masking secret-looking + fields (passwords etc.) so they never reach the journal.""" + if not raw: + return "" + text = raw.decode("utf-8", "replace") + ct = (content_type or "").lower() + try: + if "application/json" in ct: + import json as _json + obj = _json.loads(text) + if isinstance(obj, dict): + obj = {k: ("***" if any(s in k.lower() for s in _SECRET_KEYS) else v) + for k, v in obj.items()} + text = _json.dumps(obj) + else: + from urllib.parse import parse_qsl, urlencode + pairs = parse_qsl(text, keep_blank_values=True) + if pairs: + text = urlencode([(k, "***" if any(s in k.lower() for s in _SECRET_KEYS) else v) + for k, v in pairs]) + except Exception: # noqa: BLE001 + pass + return text[:limit] + ("…[truncated]" if len(text) > limit else "") + + +class _RequestBodyCapture: + """Buffer each HTTP request body onto the scope (``pk_raw_body``) so the + exception handler can log the submitted payload on failure, then replay it + downstream so route handlers still read the body normally. Bodies here are + small form/JSON posts — no large uploads go through HTTP.""" + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + body = bytearray() + while True: + message = await receive() + if message["type"] == "http.request": + body += message.get("body", b"") + if not message.get("more_body", False): + break + else: # http.disconnect + break + scope["pk_raw_body"] = bytes(body) + replayed = False + + async def replay(): + nonlocal replayed + if not replayed: + replayed = True + return {"type": "http.request", "body": bytes(body), "more_body": False} + return {"type": "http.disconnect"} + + await self.app(scope, replay, send) + + @asynccontextmanager async def _lifespan(app: FastAPI): from ..scheduler import start_scheduler @@ -48,20 +112,24 @@ async def _lifespan(app: FastAPI): def create_app() -> FastAPI: _configure_logging() app = FastAPI(title="Pipekit", version=__version__, lifespan=_lifespan) + app.add_middleware(_RequestBodyCapture) @app.exception_handler(StarletteHTTPException) async def _logged_http_exception_handler(request: Request, exc: StarletteHTTPException): # FastAPI turns HTTPException into a normal response and never logs it, # so failures surfaced this way (e.g. wizard dest-provisioning errors) - # were invisible in the journal. Log them, then defer to the default - # handler for the actual response. + # were invisible in the journal. Log them — with the submitted payload, + # secrets masked — then defer to the default handler for the response. + body = _redact_body(request.scope.get("pk_raw_body", b""), + request.headers.get("content-type", "")) if exc.status_code >= 500: - _log.error("%s %s -> %d: %s", request.method, request.url.path, - exc.status_code, exc.detail, exc_info=exc) + _log.error("%s %s -> %d: %s | body: %s", request.method, + request.url.path, exc.status_code, exc.detail, body, + exc_info=exc) elif exc.status_code >= 400: - _log.warning("%s %s -> %d: %s", request.method, request.url.path, - exc.status_code, exc.detail) + _log.warning("%s %s -> %d: %s | body: %s", request.method, + request.url.path, exc.status_code, exc.detail, body) return await http_exception_handler(request, exc) app.include_router(system.router)