diff --git a/pipekit/api/app.py b/pipekit/api/app.py index 8ea5de6..e119b47 100644 --- a/pipekit/api/app.py +++ b/pipekit/api/app.py @@ -38,6 +38,70 @@ def _configure_logging() -> None: base.propagate = False +_SECRET_KEYS = ("password", "passwd", "pwd", "secret", "token") + + +def _redact_body(raw: bytes, content_type: str, limit: int = 2000) -> str: + """Decode a captured request body for logging, masking secret-looking + fields (passwords etc.) so they never reach the journal.""" + if not raw: + return "" + text = raw.decode("utf-8", "replace") + ct = (content_type or "").lower() + try: + if "application/json" in ct: + import json as _json + obj = _json.loads(text) + if isinstance(obj, dict): + obj = {k: ("***" if any(s in k.lower() for s in _SECRET_KEYS) else v) + for k, v in obj.items()} + text = _json.dumps(obj) + else: + from urllib.parse import parse_qsl, urlencode + pairs = parse_qsl(text, keep_blank_values=True) + if pairs: + text = urlencode([(k, "***" if any(s in k.lower() for s in _SECRET_KEYS) else v) + for k, v in pairs]) + except Exception: # noqa: BLE001 + pass + return text[:limit] + ("…[truncated]" if len(text) > limit else "") + + +class _RequestBodyCapture: + """Buffer each HTTP request body onto the scope (``pk_raw_body``) so the + exception handler can log the submitted payload on failure, then replay it + downstream so route handlers still read the body normally. Bodies here are + small form/JSON posts — no large uploads go through HTTP.""" + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + body = bytearray() + while True: + message = await receive() + if message["type"] == "http.request": + body += message.get("body", b"") + if not message.get("more_body", False): + break + else: # http.disconnect + break + scope["pk_raw_body"] = bytes(body) + replayed = False + + async def replay(): + nonlocal replayed + if not replayed: + replayed = True + return {"type": "http.request", "body": bytes(body), "more_body": False} + return {"type": "http.disconnect"} + + await self.app(scope, replay, send) + + @asynccontextmanager async def _lifespan(app: FastAPI): from ..scheduler import start_scheduler @@ -48,20 +112,24 @@ async def _lifespan(app: FastAPI): def create_app() -> FastAPI: _configure_logging() app = FastAPI(title="Pipekit", version=__version__, lifespan=_lifespan) + app.add_middleware(_RequestBodyCapture) @app.exception_handler(StarletteHTTPException) async def _logged_http_exception_handler(request: Request, exc: StarletteHTTPException): # FastAPI turns HTTPException into a normal response and never logs it, # so failures surfaced this way (e.g. wizard dest-provisioning errors) - # were invisible in the journal. Log them, then defer to the default - # handler for the actual response. + # were invisible in the journal. Log them — with the submitted payload, + # secrets masked — then defer to the default handler for the response. + body = _redact_body(request.scope.get("pk_raw_body", b""), + request.headers.get("content-type", "")) if exc.status_code >= 500: - _log.error("%s %s -> %d: %s", request.method, request.url.path, - exc.status_code, exc.detail, exc_info=exc) + _log.error("%s %s -> %d: %s | body: %s", request.method, + request.url.path, exc.status_code, exc.detail, body, + exc_info=exc) elif exc.status_code >= 400: - _log.warning("%s %s -> %d: %s", request.method, request.url.path, - exc.status_code, exc.detail) + _log.warning("%s %s -> %d: %s | body: %s", request.method, + request.url.path, exc.status_code, exc.detail, body) return await http_exception_handler(request, exc) app.include_router(system.router)