diff --git a/demos/filter_chains/peyeeye/Dockerfile b/demos/filter_chains/peyeeye/Dockerfile new file mode 100644 index 000000000..2ba02db7c --- /dev/null +++ b/demos/filter_chains/peyeeye/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +RUN pip install --no-cache-dir uv + +COPY pyproject.toml ./ +RUN uv pip install --system --no-cache-dir \ + "fastapi>=0.104.1" \ + "uvicorn>=0.24.0" \ + "httpx>=0.27.0" \ + "pydantic>=2.0.0" + +COPY peyeeye.py ./ + +EXPOSE 10502 + +CMD ["uvicorn", "peyeeye:app", "--host", "0.0.0.0", "--port", "10502"] diff --git a/demos/filter_chains/peyeeye/README.md b/demos/filter_chains/peyeeye/README.md new file mode 100644 index 000000000..3aedf1198 --- /dev/null +++ b/demos/filter_chains/peyeeye/README.md @@ -0,0 +1,103 @@ +# Peyeeye PII Filter Chain Demo + +Drop-in PII redaction + rehydration for Plano via the [Peyeeye](https://peyeeye.ai) API. + +The model never sees raw PII — incoming text is sent to `/v1/redact` and replaced with stable placeholders like `[EMAIL_0]`, `[PERSON_1]`, etc. After the model responds, the placeholders in its output are swapped back to the originals via `/v1/rehydrate`. + +## Architecture + +``` +Client --> Plano (model listener :12000) + | + +-- input_filters: peyeeye_redact + | -> POST https://api.peyeeye.ai/v1/redact + | -> body messages contain [EMAIL_0], [SSN_0], ... + | + +-- model_provider: openai/gpt-4o-mini (or claude, etc.) + | -> the LLM only ever sees redacted text + | + +-- output_filters: peyeeye_rehydrate + -> POST https://api.peyeeye.ai/v1/rehydrate + -> placeholders restored to originals +``` + +## Quick start + +```bash +export PEYEEYE_API_KEY=pk_live_... # https://peyeeye.ai +export OPENAI_API_KEY=sk-... + +bash run_demo.sh + +# in another terminal +bash test.sh + +# stop +bash run_demo.sh down +``` + +## Try it + +```bash +curl http://localhost:12000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Email me at jane@example.com about SSN 123-45-6789"}], + "stream": false + }' +``` + +The response body comes back with the original email and SSN restored, while the request that hit OpenAI carried `[EMAIL_0]` and `[SSN_0]`. + +## Configuration + +All configuration is via env vars on the filter service: + +| Var | Default | Notes | +|---|---|---| +| `PEYEEYE_API_KEY` | _required_ | get one at peyeeye.ai | +| `PEYEEYE_API_BASE` | `https://api.peyeeye.ai` | override for self-hosted | +| `PEYEEYE_LOCALE` | `auto` | BCP-47 | +| `PEYEEYE_ENTITIES` | _all_ | comma-separated list, e.g. `EMAIL,SSN,CREDIT_CARD` | +| `PEYEEYE_SESSION_MODE` | `stateful` | `stateful` (default) or `stateless` | + +In `stateless` mode, Peyeeye returns a sealed `skey_...` blob instead of holding the mapping server-side; this filter caches the blob on the request id and uses it for rehydration. No per-request state is retained on Peyeeye's servers. + +## Filter contract + +**Input filter (`/redact/{path:path}`)** receives the full raw request body. It walks `messages[].content` (string or multimodal `text` parts) for chat-style endpoints and `input` for the OpenAI Responses API, sends a single batched call to Peyeeye, and writes the redacted text back into the body in place. + +**Output filter (`/rehydrate/{path:path}`)** receives the raw LLM response bytes, looks up the cached session id by the request id (`x-request-id`), and rehydrates `choices[].message.content`, Anthropic-style `content[].text`, or Responses-API `output[].content[].text`. + +## Behavioral invariants + +- **Fail-closed.** If `/v1/redact` returns an unexpected shape, or the count of returned texts doesn't match the count sent, the filter raises a 502 — no unredacted text is ever forwarded to the model. +- **Length-guard.** `len(redacted) == len(sent)` is asserted before zipping the values back into the request. +- **Typed errors.** `PEyeEyeMissingSecrets` covers auth (401, missing key), `PEyeEyeAPIError` covers everything else (timeouts, 4xx, 5xx, parse). Both surface as HTTP errors to Plano. +- **Best-effort cleanup.** Stateful sessions are deleted server-side after rehydration via `DELETE /v1/sessions/{ses_...}`. + +## Streaming + +Streaming SSE responses are passed through unchanged in this demo — token-by-token rehydration would require buffering or a session-aware streaming endpoint. For now, set `stream: false` on requests routed through this filter chain. + +## Tests + +```bash +uv sync --group dev +uv run pytest -v +``` + +The suite mocks the Peyeeye API (`respx`) and exercises: + +- redact + rehydrate round trip on chat completions +- redact + rehydrate on `/v1/messages` (Anthropic) and `/v1/responses` (OpenAI) +- the length-guard (redact returns wrong count -> 502) +- the unexpected-shape guard (redact returns non-string/list -> 502) +- the no-PII passthrough (no redactable text -> body unchanged, no session cached) +- the no-cached-session passthrough on rehydrate +- multimodal `[{"type":"text","text":...}]` content lists + +## Disclosure + +I'm the maintainer of peyeeye.ai. Happy to adjust API surface, naming, or test coverage to match Plano's conventions. diff --git a/demos/filter_chains/peyeeye/config.yaml b/demos/filter_chains/peyeeye/config.yaml new file mode 100644 index 000000000..83213e29e --- /dev/null +++ b/demos/filter_chains/peyeeye/config.yaml @@ -0,0 +1,42 @@ +version: v0.3.0 + +# Peyeeye PII redaction & rehydration filter chain. +# +# Prereqs: +# * export PEYEEYE_API_KEY=... (required) +# * export OPENAI_API_KEY=... (or any provider you wire up below) +# +# The peyeeye filter service runs on :10502 and exposes: +# POST /redact/{path} — input filter +# POST /rehydrate/{path} — output filter +# +# Plano appends the upstream path automatically (e.g. /v1/chat/completions), +# so /redact/v1/chat/completions hits the redactor, and the model only ever +# sees ``[EMAIL_0]``-style placeholders. + +filters: + - id: peyeeye_redact + url: http://localhost:10502/redact + type: http + - id: peyeeye_rehydrate + url: http://localhost:10502/rehydrate + type: http + +model_providers: + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + +listeners: + - type: model + name: llm_gateway + port: 12000 + input_filters: + - peyeeye_redact + output_filters: + - peyeeye_rehydrate + +tracing: + random_sampling: 100 diff --git a/demos/filter_chains/peyeeye/peyeeye.py b/demos/filter_chains/peyeeye/peyeeye.py new file mode 100644 index 000000000..4e0bc7103 --- /dev/null +++ b/demos/filter_chains/peyeeye/peyeeye.py @@ -0,0 +1,485 @@ +"""Peyeeye PII redaction & rehydration filter for Plano filter chains. + +Two endpoints, mirroring the pii_anonymizer demo: + + POST /redact/{path:path} — input filter; redacts PII before the LLM call. + POST /rehydrate/{path:path} — output filter; restores PII in the LLM response. + +The filter delegates detection and rehydration to the Peyeeye API +(``https://api.peyeeye.ai`` by default). Two session modes are supported: + + * ``stateful`` (default) — Peyeeye holds the token -> value mapping under a + ``ses_...`` id; rehydrate references the id. + * ``stateless`` — Peyeeye returns a sealed ``skey_...`` blob; nothing is + retained server-side. + +Behavioral invariants (mirrored from the LiteLLM peyeeye guardrail): + + * Pre-call: redact every text-bearing chunk in the request. If the count of + returned texts doesn't match the count sent, raise -- never forward + partially-redacted source. + * Post-call: pull the cached session id, rehydrate the response, and best- + effort delete the stateful session. + * Fail-closed: any unexpected response shape from /v1/redact raises a typed + error rather than silently passing PII through. + +Configuration knobs (env vars): + + * ``PEYEEYE_API_KEY`` — required. + * ``PEYEEYE_API_BASE`` — defaults to ``https://api.peyeeye.ai``. + * ``PEYEEYE_LOCALE`` — BCP-47, default ``auto``. + * ``PEYEEYE_ENTITIES`` — comma-separated entity ids to restrict detection. + * ``PEYEEYE_SESSION_MODE`` — ``stateful`` (default) or ``stateless``. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import threading +import time +from typing import Any, Dict, List, Optional, Tuple + +import httpx +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, Response + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - [PEYEEYE] - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +DEFAULT_API_BASE = "https://api.peyeeye.ai" +SESSION_TTL_SECONDS = 3600 +HTTP_TIMEOUT_SECONDS = 15.0 + + +# ----------------------------------------------------------------------- errors + + +class PEyeEyeAPIError(Exception): + """Raised when the Peyeeye API returns an error or unexpected payload.""" + + +class PEyeEyeMissingSecrets(Exception): + """Raised when no Peyeeye API key is configured.""" + + +# ------------------------------------------------------------------- mini cache + + +class _SessionCache: + """Tiny in-memory ``request_id -> session_id`` store with TTL.""" + + def __init__(self, ttl_seconds: int = SESSION_TTL_SECONDS) -> None: + self._ttl = ttl_seconds + self._lock = threading.Lock() + self._store: Dict[str, Tuple[str, float]] = {} + + def _expire(self) -> None: + now = time.time() + expired = [k for k, (_, ts) in self._store.items() if now - ts > self._ttl] + for k in expired: + del self._store[k] + + def set(self, key: str, value: str) -> None: + with self._lock: + self._expire() + self._store[key] = (value, time.time()) + + def get(self, key: str) -> Optional[str]: + with self._lock: + entry = self._store.get(key) + return entry[0] if entry else None + + def pop(self, key: str) -> Optional[str]: + with self._lock: + entry = self._store.pop(key, None) + return entry[0] if entry else None + + +# ------------------------------------------------------------------ peyeeye client + + +class PEyeEyeClient: + """Async HTTP client for the Peyeeye redact/rehydrate API.""" + + def __init__( + self, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + locale: Optional[str] = None, + entities: Optional[List[str]] = None, + session_mode: Optional[str] = None, + ) -> None: + key = api_key or os.environ.get("PEYEEYE_API_KEY") + if not key: + raise PEyeEyeMissingSecrets( + "Peyeeye API key missing — set the PEYEEYE_API_KEY env var." + ) + self.api_key = key + self.api_base = ( + api_base or os.environ.get("PEYEEYE_API_BASE") or DEFAULT_API_BASE + ).rstrip("/") + self.locale = locale or os.environ.get("PEYEEYE_LOCALE") or "auto" + env_entities = os.environ.get("PEYEEYE_ENTITIES") + if entities is None and env_entities: + entities = [e.strip() for e in env_entities.split(",") if e.strip()] + self.entities = entities or None + mode = session_mode or os.environ.get("PEYEEYE_SESSION_MODE") or "stateful" + if mode not in ("stateful", "stateless"): + raise ValueError( + f"PEYEEYE_SESSION_MODE must be 'stateful' or 'stateless', got {mode!r}" + ) + self.session_mode = mode + self._client: Optional[httpx.AsyncClient] = None + + async def _async_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS) + return self._client + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + async def redact_batch(self, texts: List[str]) -> Tuple[List[str], Optional[str]]: + """Redact a batch of texts. Returns ``(redacted, session_id_or_skey)``. + + Raises ``PEyeEyeAPIError`` on any non-2xx, timeout, or unexpected shape. + """ + body: Dict[str, Any] = {"text": texts, "locale": self.locale} + if self.entities: + body["entities"] = list(self.entities) + if self.session_mode == "stateless": + body["session"] = "stateless" + + payload = await self._post("/v1/redact", body) + out = payload.get("text") + if isinstance(out, str): + redacted = [out] + elif isinstance(out, list): + redacted = [str(x) for x in out] + else: + raise PEyeEyeAPIError( + "Peyeeye /v1/redact returned an unexpected response shape; " + "refusing to forward unredacted text." + ) + + if self.session_mode == "stateless": + session_id = payload.get("rehydration_key") + else: + session_id = payload.get("session_id") or payload.get("session") + + return redacted, session_id + + async def rehydrate(self, text: str, session_id: str) -> str: + if not text: + return text + try: + payload = await self._post( + "/v1/rehydrate", {"text": text, "session": session_id} + ) + except PEyeEyeAPIError as e: + # Rehydration failures must not corrupt or drop the LLM response; + # log and fall back to the (already-redacted) text. + logger.warning("rehydrate failed: %s", e) + return text + return payload.get("text", text) + + async def delete_session(self, session_id: str) -> None: + if not session_id.startswith("ses_"): + return + client = await self._async_client() + try: + await client.delete( + f"{self.api_base}/v1/sessions/{session_id}", + headers=self._headers(), + timeout=10.0, + ) + except Exception as e: # pragma: no cover - best effort + logger.debug("session cleanup failed: %s", e) + + async def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]: + client = await self._async_client() + url = f"{self.api_base}{path}" + try: + resp = await client.post(url, headers=self._headers(), json=body) + except httpx.TimeoutException as e: + raise PEyeEyeAPIError(f"Peyeeye {path} timed out") from e + except httpx.HTTPError as e: + raise PEyeEyeAPIError(f"Peyeeye {path} request failed: {e}") from e + if resp.status_code == 401: + raise PEyeEyeMissingSecrets("Invalid Peyeeye API key") from None + if resp.status_code >= 400: + raise PEyeEyeAPIError( + f"Peyeeye {path} returned HTTP {resp.status_code}: {resp.text[:200]}" + ) + try: + return resp.json() + except json.JSONDecodeError as e: + raise PEyeEyeAPIError(f"Peyeeye {path} returned non-JSON body") from e + + +# ------------------------------------------------------------- request walkers + + +def iter_request_texts(body: Dict[str, Any], endpoint: str) -> List[Tuple[str, ...]]: + """Yield ``("path", ...)`` tuples identifying every user-text chunk. + + Mirrors the litellm pre-call hook: walks ``messages[].content`` (string or + multimodal text-part list) for chat-style endpoints, and ``input`` for the + OpenAI Responses API. + + Returns a list of (path-tuple, text) pairs. ``path-tuple`` is consumed by + ``set_request_text`` to write the redacted value back. + """ + parts: List[Tuple[Tuple[Any, ...], str]] = [] + + if endpoint == "/v1/responses": + input_val = body.get("input") + if isinstance(input_val, str) and input_val: + parts.append((("input",), input_val)) + elif isinstance(input_val, list): + for i, item in enumerate(input_val): + if not isinstance(item, dict): + continue + if item.get("role") != "user": + continue + content = item.get("content") + if isinstance(content, str) and content: + parts.append((("input", i, "content"), content)) + elif isinstance(content, list): + for j, sub in enumerate(content): + if isinstance(sub, dict) and sub.get("type") == "text": + text = sub.get("text", "") + if text: + parts.append((("input", i, "content", j, "text"), text)) + return parts + + # /v1/chat/completions and /v1/messages both use messages[] + messages = body.get("messages") or [] + for i, msg in enumerate(messages): + if not isinstance(msg, dict): + continue + if msg.get("role") != "user": + continue + content = msg.get("content") + if isinstance(content, str) and content: + parts.append((("messages", i, "content"), content)) + elif isinstance(content, list): + for j, sub in enumerate(content): + if isinstance(sub, dict) and sub.get("type") == "text": + text = sub.get("text", "") + if text: + parts.append((("messages", i, "content", j, "text"), text)) + return parts + + +def set_request_text(body: Dict[str, Any], path: Tuple[Any, ...], value: str) -> None: + """Write ``value`` into ``body`` at the given path.""" + cur: Any = body + for key in path[:-1]: + cur = cur[key] + cur[path[-1]] = value + + +def iter_response_texts( + body: Dict[str, Any], endpoint: str +) -> List[Tuple[Tuple[Any, ...], str]]: + """Yield ``(path, text)`` for every text chunk in an LLM response body.""" + parts: List[Tuple[Tuple[Any, ...], str]] = [] + if endpoint == "/v1/messages": + content = body.get("content") + if isinstance(content, list): + for j, sub in enumerate(content): + if isinstance(sub, dict) and sub.get("type") == "text": + text = sub.get("text", "") + if text: + parts.append((("content", j, "text"), text)) + return parts + + # /v1/chat/completions, /v1/responses (synchronous), and similar + choices = body.get("choices") + if isinstance(choices, list): + for i, choice in enumerate(choices): + if not isinstance(choice, dict): + continue + message = choice.get("message") + if isinstance(message, dict): + content = message.get("content") + if isinstance(content, str) and content: + parts.append((("choices", i, "message", "content"), content)) + # OpenAI Responses API: top-level "output" array + output = body.get("output") + if isinstance(output, list): + for i, item in enumerate(output): + if not isinstance(item, dict): + continue + content = item.get("content") + if isinstance(content, list): + for j, sub in enumerate(content): + if isinstance(sub, dict) and sub.get("type") in ( + "text", + "output_text", + ): + text = sub.get("text", "") + if text: + parts.append((("output", i, "content", j, "text"), text)) + return parts + + +# -------------------------------------------------------------------- FastAPI + + +def create_app(client: Optional[PEyeEyeClient] = None) -> FastAPI: + """Build the FastAPI app. ``client`` is overridable for tests.""" + app = FastAPI(title="Peyeeye PII filter", version="1.0.0") + cache = _SessionCache() + + def _client() -> PEyeEyeClient: + nonlocal client + if client is None: + client = PEyeEyeClient() + return client + + @app.post("/redact/{path:path}") + async def redact(path: str, request: Request) -> Response: + endpoint = f"/{path}" + request_id = request.headers.get("x-request-id", "unknown") + try: + body = await request.json() + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="invalid JSON body") + + text_parts = iter_request_texts(body, endpoint) + if not text_parts: + logger.info("request_id=%s no text to redact", request_id) + return JSONResponse(content=body) + + texts = [t for _, t in text_parts] + try: + redacted, session_id = await _client().redact_batch(texts) + except PEyeEyeMissingSecrets as e: + logger.error("request_id=%s missing secrets: %s", request_id, e) + raise HTTPException(status_code=500, detail=str(e)) + except PEyeEyeAPIError as e: + # Fail-closed: do not pass unredacted text through. + logger.error("request_id=%s redact failed: %s", request_id, e) + raise HTTPException(status_code=502, detail=str(e)) + + # Length-guard: must match exactly. The API contract is one-to-one. + if len(redacted) != len(text_parts): + logger.error( + "request_id=%s length mismatch: sent=%d got=%d", + request_id, + len(text_parts), + len(redacted), + ) + raise HTTPException( + status_code=502, + detail=( + f"Peyeeye /v1/redact returned {len(redacted)} texts for " + f"{len(text_parts)} inputs; refusing to forward partially-" + "redacted data" + ), + ) + + for (path_tuple, _), value in zip(text_parts, redacted): + set_request_text(body, path_tuple, value) + + if session_id: + cache.set(request_id, session_id) + logger.info( + "request_id=%s redacted %d chunk(s); cached session", + request_id, + len(text_parts), + ) + else: + logger.info( + "request_id=%s redacted %d chunk(s); no session id returned", + request_id, + len(text_parts), + ) + + return JSONResponse(content=body) + + @app.post("/rehydrate/{path:path}") + async def rehydrate(path: str, request: Request) -> Response: + endpoint = f"/{path}" + request_id = request.headers.get("x-request-id", "unknown") + raw = await request.body() + + session_id = cache.pop(request_id) + if not session_id: + logger.info( + "request_id=%s no session cached, passing response through", + request_id, + ) + return Response(content=raw, media_type="application/json") + + # Streaming SSE: not supported for stateful rehydration in this demo. + # Plano sends raw chunks, but rehydration needs the full token to look + # up the original; we pass through and rely on a non-streaming flow. + body_str = raw.decode("utf-8", errors="replace") + if body_str.lstrip().startswith("data:") or "data: " in body_str[:32]: + logger.warning( + "request_id=%s SSE not supported; passing through " + "(use non-streaming for now)", + request_id, + ) + # Don't lose the session id on the way back if SSE. + cache.set(request_id, session_id) + return Response(content=raw, media_type="text/event-stream") + + try: + body = json.loads(body_str) + except json.JSONDecodeError: + logger.warning( + "request_id=%s response is not JSON; passing through", request_id + ) + return Response(content=raw, media_type="application/json") + + text_parts = iter_response_texts(body, endpoint) + if not text_parts: + logger.info("request_id=%s no response text to rehydrate", request_id) + await _maybe_delete_session(_client(), session_id) + return JSONResponse(content=body) + + # Run rehydrate calls concurrently — each returns the original text, + # falling back to the redacted value on rehydrate error. + tasks = [_client().rehydrate(text, session_id) for _, text in text_parts] + restored = await asyncio.gather(*tasks) + for (path_tuple, _), value in zip(text_parts, restored): + set_request_text(body, path_tuple, value) + + await _maybe_delete_session(_client(), session_id) + logger.info("request_id=%s rehydrated %d chunk(s)", request_id, len(text_parts)) + return JSONResponse(content=body) + + @app.get("/health") + async def health() -> Dict[str, str]: + return {"status": "healthy"} + + return app + + +async def _maybe_delete_session(client: PEyeEyeClient, session_id: str) -> None: + """Best-effort delete of a stateful session id.""" + if client.session_mode == "stateful": + try: + await client.delete_session(session_id) + except Exception: # pragma: no cover - best effort + pass + + +# Default ASGI app — used by ``uvicorn peyeeye:app``. +# Lazily resolves the client so tests can construct ``create_app(client=...)`` +# without requiring PEYEEYE_API_KEY in the environment. +app = create_app() diff --git a/demos/filter_chains/peyeeye/pyproject.toml b/demos/filter_chains/peyeeye/pyproject.toml new file mode 100644 index 000000000..605991f5f --- /dev/null +++ b/demos/filter_chains/peyeeye/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "peyeeye_filter" +version = "0.1.0" +description = "Peyeeye PII redaction & rehydration filter for Plano filter chains" +readme = "README.md" +requires-python = ">=3.10,<3.14" +dependencies = [ + "fastapi>=0.104.1", + "uvicorn>=0.24.0", + "httpx>=0.27.0", + "pydantic>=2.0.0", +] + +[dependency-groups] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "respx>=0.21.0", +] + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] diff --git a/demos/filter_chains/peyeeye/run_demo.sh b/demos/filter_chains/peyeeye/run_demo.sh new file mode 100644 index 000000000..752efbcf0 --- /dev/null +++ b/demos/filter_chains/peyeeye/run_demo.sh @@ -0,0 +1,44 @@ +#!/bin/bash +set -e + +start_demo() { + if [ -f ".env" ]; then + echo ".env file already exists. Skipping creation." + else + if [ -z "${PEYEEYE_API_KEY:-}" ]; then + echo "Error: PEYEEYE_API_KEY environment variable is not set for the demo." + exit 1 + fi + if [ -z "${OPENAI_API_KEY:-}" ]; then + echo "Error: OPENAI_API_KEY environment variable is not set for the demo." + exit 1 + fi + + echo "Creating .env file..." + { + echo "PEYEEYE_API_KEY=$PEYEEYE_API_KEY" + echo "OPENAI_API_KEY=$OPENAI_API_KEY" + } > .env + echo ".env file created." + fi + + echo "Starting Plano with config.yaml..." + planoai up config.yaml + + echo "Starting Peyeeye filter service..." + bash start_agents.sh & +} + +stop_demo() { + echo "Stopping Peyeeye filter service..." + pkill -f start_agents.sh 2>/dev/null || true + + echo "Stopping Plano..." + planoai down +} + +if [ "$1" == "down" ]; then + stop_demo +else + start_demo +fi diff --git a/demos/filter_chains/peyeeye/start_agents.sh b/demos/filter_chains/peyeeye/start_agents.sh new file mode 100644 index 000000000..c549fdc45 --- /dev/null +++ b/demos/filter_chains/peyeeye/start_agents.sh @@ -0,0 +1,29 @@ +#!/bin/bash +set -e + +PIDS=() + +log() { echo "$(date '+%F %T') - $*"; } + +cleanup() { + log "Stopping agents..." + for PID in "${PIDS[@]}"; do + kill $PID 2>/dev/null && log "Stopped process $PID" + done + exit 0 +} + +trap cleanup EXIT INT TERM + +if [ -z "${PEYEEYE_API_KEY:-}" ]; then + log "ERROR: PEYEEYE_API_KEY is not set." + exit 1 +fi + +log "Starting Peyeeye filter service on port 10502..." +uv run uvicorn peyeeye:app --host 0.0.0.0 --port 10502 & +PIDS+=($!) + +for PID in "${PIDS[@]}"; do + wait "$PID" +done diff --git a/demos/filter_chains/peyeeye/test.sh b/demos/filter_chains/peyeeye/test.sh new file mode 100644 index 000000000..5326b2f3d --- /dev/null +++ b/demos/filter_chains/peyeeye/test.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +set -euo pipefail + +BASE_URL="http://localhost:12000" +PASS=0 +FAIL=0 + +echo "Waiting for Plano to be ready..." +for i in $(seq 1 30); do + if curl -sf "$BASE_URL/v1/models" > /dev/null 2>&1; then + echo "Plano is ready." + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: Plano did not become ready in time." + exit 1 + fi + sleep 2 +done + +run_test() { + local name="$1" + local path="$2" + local expected_code="$3" + local body="$4" + + http_code=$(curl -s -o /tmp/peyeeye_test_body -w "%{http_code}" \ + -X POST "$BASE_URL$path" \ + -H "Content-Type: application/json" \ + -d "$body") + + if [ "$http_code" -eq "$expected_code" ]; then + echo " PASS $name (HTTP $http_code)" + PASS=$((PASS + 1)) + else + echo " FAIL $name -- expected $expected_code, got $http_code" + echo " Body: $(cat /tmp/peyeeye_test_body)" + FAIL=$((FAIL + 1)) + fi +} + +echo "" +echo "=== /v1/chat/completions ===" + +run_test "Non-streaming with PII" /v1/chat/completions 200 '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Email me at jane@example.com"}], + "stream": false +}' + +run_test "No PII" /v1/chat/completions 200 '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "What is 2+2?"}], + "stream": false +}' + +echo "" +echo "=== /v1/messages (Anthropic) ===" + +run_test "Non-streaming with PII (SSN)" /v1/messages 200 '{ + "model": "claude-sonnet-4-20250514", + "max_tokens": 256, + "messages": [{"role": "user", "content": "My SSN is 123-45-6789"}] +}' + +echo "" +echo "Results: $PASS passed, $FAIL failed" + +if [ "$FAIL" -gt 0 ]; then + exit 1 +fi diff --git a/demos/filter_chains/peyeeye/test_peyeeye.py b/demos/filter_chains/peyeeye/test_peyeeye.py new file mode 100644 index 000000000..87992589b --- /dev/null +++ b/demos/filter_chains/peyeeye/test_peyeeye.py @@ -0,0 +1,471 @@ +"""Tests for the Peyeeye Plano filter. + +The Peyeeye API (`https://api.peyeeye.ai`) is fully mocked with `respx`. Every +new branch in `peyeeye.py` should have at least one test below. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict + +import httpx +import pytest +import respx +from fastapi.testclient import TestClient + +from peyeeye import ( + PEyeEyeClient, + PEyeEyeMissingSecrets, + create_app, + iter_request_texts, + iter_response_texts, + set_request_text, +) + +# ------------------------------------------------------------------- fixtures + + +@pytest.fixture +def api_base() -> str: + return "https://api.peyeeye.ai" + + +@pytest.fixture +def client(monkeypatch, api_base) -> PEyeEyeClient: + monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123") + monkeypatch.delenv("PEYEEYE_API_BASE", raising=False) + monkeypatch.delenv("PEYEEYE_LOCALE", raising=False) + monkeypatch.delenv("PEYEEYE_ENTITIES", raising=False) + monkeypatch.delenv("PEYEEYE_SESSION_MODE", raising=False) + return PEyeEyeClient() + + +@pytest.fixture +def app_client(client) -> TestClient: + return TestClient(create_app(client=client)) + + +# --------------------------------------------------------------- client tests + + +def test_client_missing_api_key(monkeypatch): + monkeypatch.delenv("PEYEEYE_API_KEY", raising=False) + with pytest.raises(PEyeEyeMissingSecrets): + PEyeEyeClient() + + +def test_client_invalid_session_mode(monkeypatch): + monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123") + with pytest.raises(ValueError): + PEyeEyeClient(session_mode="bogus") + + +def test_client_picks_up_env_entities(monkeypatch): + monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123") + monkeypatch.setenv("PEYEEYE_ENTITIES", "EMAIL, SSN ,CREDIT_CARD") + c = PEyeEyeClient() + assert c.entities == ["EMAIL", "SSN", "CREDIT_CARD"] + + +# -------------------------------------------------------------- iter helpers + + +def test_iter_request_texts_chat_string(): + body = {"messages": [{"role": "user", "content": "hello jane@example.com"}]} + parts = iter_request_texts(body, "/v1/chat/completions") + assert parts == [(("messages", 0, "content"), "hello jane@example.com")] + + +def test_iter_request_texts_chat_multimodal(): + body = { + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "hi"}, + {"type": "image_url", "image_url": {"url": "..."}}, + {"type": "text", "text": "ssn 111-22-3333"}, + ], + } + ] + } + parts = iter_request_texts(body, "/v1/chat/completions") + assert parts == [ + (("messages", 0, "content", 0, "text"), "hi"), + (("messages", 0, "content", 2, "text"), "ssn 111-22-3333"), + ] + + +def test_iter_request_texts_skips_non_user_roles(): + body = { + "messages": [ + {"role": "system", "content": "you are helpful"}, + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + ] + } + parts = iter_request_texts(body, "/v1/chat/completions") + assert len(parts) == 1 + assert parts[0][1] == "hi" + + +def test_iter_request_texts_responses_string(): + body = {"input": "hello jane@example.com"} + parts = iter_request_texts(body, "/v1/responses") + assert parts == [(("input",), "hello jane@example.com")] + + +def test_iter_request_texts_responses_list(): + body = { + "input": [ + {"role": "system", "content": "ignored"}, + {"role": "user", "content": "hi"}, + { + "role": "user", + "content": [{"type": "text", "text": "ssn 111-22-3333"}], + }, + ] + } + parts = iter_request_texts(body, "/v1/responses") + assert parts == [ + (("input", 1, "content"), "hi"), + (("input", 2, "content", 0, "text"), "ssn 111-22-3333"), + ] + + +def test_set_request_text_round_trip(): + body: Dict[str, Any] = {"messages": [{"role": "user", "content": "raw"}]} + set_request_text(body, ("messages", 0, "content"), "redacted") + assert body["messages"][0]["content"] == "redacted" + + +def test_iter_response_texts_chat(): + body = {"choices": [{"message": {"content": "Email [EMAIL_0] back."}}]} + parts = iter_response_texts(body, "/v1/chat/completions") + assert parts == [(("choices", 0, "message", "content"), "Email [EMAIL_0] back.")] + + +def test_iter_response_texts_anthropic(): + body = {"content": [{"type": "text", "text": "Reach [EMAIL_0]"}]} + parts = iter_response_texts(body, "/v1/messages") + assert parts == [(("content", 0, "text"), "Reach [EMAIL_0]")] + + +# ------------------------------------------------------------ /redact endpoint + + +@respx.mock +def test_redact_chat_happy_path(app_client, api_base): + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={ + "text": ["hello [EMAIL_0]"], + "session_id": "ses_abc", + }, + ) + ) + + resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "hello jane@example.com"}], + }, + headers={"x-request-id": "req-1"}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["messages"][0]["content"] == "hello [EMAIL_0]" + + +@respx.mock +def test_redact_no_pii_no_session_cached(app_client, api_base): + # Even with no text-bearing chunks, the body is returned untouched. + resp = app_client.post( + "/redact/v1/chat/completions", + json={"model": "gpt-4o-mini", "messages": []}, + headers={"x-request-id": "req-empty"}, + ) + assert resp.status_code == 200 + # No call should have been made to peyeeye. + assert not respx.routes + + +@respx.mock +def test_redact_length_guard_fails_closed(app_client, api_base): + """If the API returns a different number of texts, refuse to forward.""" + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={"text": ["only one"], "session_id": "ses_x"}, + ) + ) + resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "first jane@example.com"}, + {"role": "user", "content": "second 555-123-4567"}, + ], + }, + headers={"x-request-id": "req-bad-len"}, + ) + assert resp.status_code == 502 + assert "refusing to forward" in resp.json()["detail"] + + +@respx.mock +def test_redact_unexpected_shape_fails_closed(app_client, api_base): + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response(200, json={"text": 42, "session_id": "ses_x"}) + ) + resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "x"}], + }, + headers={"x-request-id": "req-bad-shape"}, + ) + assert resp.status_code == 502 + assert "unexpected response shape" in resp.json()["detail"] + + +@respx.mock +def test_redact_5xx_fails_closed(app_client, api_base): + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response(500, text="boom") + ) + resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "x"}], + }, + headers={"x-request-id": "req-5xx"}, + ) + assert resp.status_code == 502 + assert "HTTP 500" in resp.json()["detail"] + + +@respx.mock +def test_redact_401_fails_closed_with_500(app_client, api_base): + """Auth errors surface as PEyeEyeMissingSecrets -> HTTP 500.""" + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response(401, json={"error": "bad key"}) + ) + resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "x"}], + }, + headers={"x-request-id": "req-401"}, + ) + assert resp.status_code == 500 + assert "Invalid Peyeeye API key" in resp.json()["detail"] + + +@respx.mock +def test_redact_invalid_json_body(app_client): + resp = app_client.post( + "/redact/v1/chat/completions", + content=b"not json", + headers={ + "Content-Type": "application/json", + "x-request-id": "req-bad-json", + }, + ) + assert resp.status_code == 400 + + +# --------------------------------------------------------- /rehydrate endpoint + + +@respx.mock +def test_rehydrate_round_trip(app_client, api_base): + """Redact then rehydrate share the request id; placeholders restored.""" + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={ + "text": ["hello [EMAIL_0]"], + "session_id": "ses_round", + }, + ) + ) + respx.post(f"{api_base}/v1/rehydrate").mock( + return_value=httpx.Response( + 200, json={"text": "Reach jane@example.com today.", "replaced": 1} + ) + ) + respx.delete(f"{api_base}/v1/sessions/ses_round").mock( + return_value=httpx.Response(204) + ) + + redact_resp = app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "hello jane@example.com"}], + }, + headers={"x-request-id": "req-round"}, + ) + assert redact_resp.status_code == 200 + + rehydrate_resp = app_client.post( + "/rehydrate/v1/chat/completions", + json={"choices": [{"message": {"content": "Reach [EMAIL_0] today."}}]}, + headers={"x-request-id": "req-round"}, + ) + assert rehydrate_resp.status_code == 200 + body = rehydrate_resp.json() + assert body["choices"][0]["message"]["content"] == "Reach jane@example.com today." + + +@respx.mock +def test_rehydrate_no_session_cached_passthrough(app_client): + raw = {"choices": [{"message": {"content": "no session here"}}]} + resp = app_client.post( + "/rehydrate/v1/chat/completions", + json=raw, + headers={"x-request-id": "req-uncached"}, + ) + assert resp.status_code == 200 + assert resp.json() == raw + + +@respx.mock +def test_rehydrate_anthropic_messages(app_client, api_base): + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={"text": ["My ssn is [SSN_0]"], "session_id": "ses_anth"}, + ) + ) + respx.post(f"{api_base}/v1/rehydrate").mock( + return_value=httpx.Response( + 200, json={"text": "My ssn is 111-22-3333", "replaced": 1} + ) + ) + respx.delete(f"{api_base}/v1/sessions/ses_anth").mock( + return_value=httpx.Response(204) + ) + + app_client.post( + "/redact/v1/messages", + json={ + "model": "claude-sonnet-4", + "messages": [{"role": "user", "content": "My ssn is 111-22-3333"}], + }, + headers={"x-request-id": "req-anth"}, + ) + resp = app_client.post( + "/rehydrate/v1/messages", + json={"content": [{"type": "text", "text": "My ssn is [SSN_0]"}]}, + headers={"x-request-id": "req-anth"}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["content"][0]["text"] == "My ssn is 111-22-3333" + + +@respx.mock +def test_rehydrate_sse_passthrough(app_client, api_base): + """SSE bodies are passed through unchanged (and the session id is kept).""" + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, json={"text": ["[EMAIL_0]"], "session_id": "ses_sse"} + ) + ) + app_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "jane@example.com"}], + }, + headers={"x-request-id": "req-sse"}, + ) + sse_body = b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n' + resp = app_client.post( + "/rehydrate/v1/chat/completions", + content=sse_body, + headers={ + "Content-Type": "text/event-stream", + "x-request-id": "req-sse", + }, + ) + assert resp.status_code == 200 + assert resp.content == sse_body + + +@respx.mock +def test_redact_responses_input_string(app_client, api_base): + respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={ + "text": ["My email is [EMAIL_0]"], + "session_id": "ses_resp", + }, + ) + ) + resp = app_client.post( + "/redact/v1/responses", + json={"model": "gpt-4o-mini", "input": "My email is jane@example.com"}, + headers={"x-request-id": "req-resp"}, + ) + assert resp.status_code == 200 + assert resp.json()["input"] == "My email is [EMAIL_0]" + + +@respx.mock +def test_stateless_session_mode(monkeypatch, api_base): + """Stateless mode caches the rehydration_key and skips DELETE.""" + monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123") + monkeypatch.setenv("PEYEEYE_SESSION_MODE", "stateless") + client = PEyeEyeClient() + test_client = TestClient(create_app(client=client)) + + redact_route = respx.post(f"{api_base}/v1/redact").mock( + return_value=httpx.Response( + 200, + json={ + "text": ["hello [EMAIL_0]"], + "rehydration_key": "skey_xyz", + }, + ) + ) + rehydrate_route = respx.post(f"{api_base}/v1/rehydrate").mock( + return_value=httpx.Response( + 200, json={"text": "hello jane@example.com", "replaced": 1} + ) + ) + delete_route = respx.delete(f"{api_base}/v1/sessions/skey_xyz") + + test_client.post( + "/redact/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "hello jane@example.com"}], + }, + headers={"x-request-id": "req-stateless"}, + ) + # Inspect the redact request body to confirm session=stateless was sent. + assert redact_route.called + sent_body = json.loads(redact_route.calls.last.request.content) + assert sent_body["session"] == "stateless" + + resp = test_client.post( + "/rehydrate/v1/chat/completions", + json={"choices": [{"message": {"content": "hello [EMAIL_0]"}}]}, + headers={"x-request-id": "req-stateless"}, + ) + assert resp.status_code == 200 + assert resp.json()["choices"][0]["message"]["content"] == "hello jane@example.com" + # Stateless: no DELETE call. + assert not delete_route.called + assert rehydrate_route.called