Skip to content

Commit 251b82d

Browse files
authored
feat: add userId storage (#92)
1 parent 7d9ae48 commit 251b82d

File tree

10 files changed

+253
-6
lines changed

10 files changed

+253
-6
lines changed

API_DOCUMENTATION.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ No authentication is enforced by the server. Add your own gateway (API key, OAut
2323
- `Content-Type: application/json` — required for JSON POSTs.
2424
- `Accept: application/json` or `text/event-stream` depending on `stream` usage.
2525
- `x-mcp-mode: true` or `mcp: true` — optional. When present (any value), the request runs in **MCP mode**, returning raw documentation snippets instead of synthesized answers. See [MCP Mode](#mcp-mode).
26+
- `x-conversation-id` — optional. Links multiple requests to the same conversation for analytics.
27+
- `x-user-id` — optional. Anonymous user identifier for tracking usage across sessions. The server hashes this value before storage for privacy.
28+
- `x-api-key` — optional. When present, the server uses a hash of the API key as the user identifier (takes precedence over `x-user-id`).
29+
30+
#### User Identification
31+
32+
The server derives an anonymized `user_id` for each request using the following priority:
33+
34+
1. If `x-api-key` header is present → hash the API key
35+
2. Else if `x-user-id` header is present → hash the user ID
36+
3. Else → no user identification
37+
38+
This allows tracking user behavior across sessions without storing sensitive identifiers.
2639

2740
## Health Check
2841

@@ -366,6 +379,8 @@ Fetch paginated user queries. If no date range is provided, returns the most rec
366379
- `query_text` _(optional)_ — filter by text contained in the query (case-insensitive).
367380
- `limit` _(default `100`)_ — maximum rows returned.
368381
- `offset` _(default `0`)_ — pagination offset.
382+
- `conversation_id` _(optional)_ — filter by conversation id.
383+
- `user_id` _(optional)_ — filter by hashed user id. Useful for tracking individual user journeys and retention.
369384

370385
**Response** `200 OK`
371386

@@ -381,7 +396,9 @@ Fetch paginated user queries. If no date range is provided, returns the most rec
381396
{ "role": "user", "content": "What is Cairo?" },
382397
{ "role": "assistant", "content": "Cairo is a programming language..." }
383398
],
384-
"output": "To declare a storage variable in Cairo 1, you use the #[storage] attribute..."
399+
"output": "To declare a storage variable in Cairo 1, you use the #[storage] attribute...",
400+
"conversation_id": "123e4567-e89b-12d3-a456-426614174000",
401+
"user_id": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6"
385402
}
386403
],
387404
"total": 1,
@@ -390,6 +407,8 @@ Fetch paginated user queries. If no date range is provided, returns the most rec
390407
}
391408
```
392409

410+
The `user_id` field contains the hashed identifier derived from either the `x-api-key` or `x-user-id` header at request time. This enables user retention and usage analytics without storing raw identifiers.
411+
393412
## Versioning & Compatibility
394413

395414
- Current API version: `1.0.0` (see FastAPI metadata).

python/src/cairo_coder/db/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class UserInteraction(BaseModel):
2121
agent_id: str
2222
mcp_mode: bool = False
2323
conversation_id: Optional[str] = None
24+
user_id: Optional[str] = None
2425
chat_history: Optional[list[dict[str, Any]]] = None
2526
query: str
2627
generated_answer: Optional[str] = None

python/src/cairo_coder/db/repository.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,20 @@ async def create_user_interaction(interaction: UserInteraction) -> None:
8484
agent_id,
8585
mcp_mode,
8686
conversation_id,
87+
user_id,
8788
chat_history,
8889
query,
8990
generated_answer,
9091
retrieved_sources,
9192
llm_usage
9293
)
93-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
94+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
9495
""",
9596
interaction.id,
9697
interaction.agent_id,
9798
interaction.mcp_mode,
9899
interaction.conversation_id,
100+
interaction.user_id,
99101
_serialize_json_field(interaction.chat_history),
100102
interaction.query,
101103
interaction.generated_answer,
@@ -115,6 +117,7 @@ async def get_interactions(
115117
offset: int,
116118
query_text: str | None = None,
117119
conversation_id: str | None = None,
120+
user_id: str | None = None,
118121
) -> tuple[list[dict[str, Any]], int]:
119122
"""Fetch paginated interactions matching the supplied filters.
120123
@@ -146,6 +149,10 @@ async def get_interactions(
146149
params.append(conversation_id)
147150
filters.append(f"conversation_id = ${len(params)}")
148151

152+
if user_id:
153+
params.append(user_id)
154+
filters.append(f"user_id = ${len(params)}")
155+
149156
where_clause = "WHERE " + " AND ".join(filters) if filters else ""
150157

151158
count_query = f"""
@@ -159,7 +166,7 @@ async def get_interactions(
159166
limit_placeholder = len(params) - 1
160167
offset_placeholder = len(params)
161168
data_query = f"""
162-
SELECT id, created_at, agent_id, query, chat_history, generated_answer, conversation_id
169+
SELECT id, created_at, agent_id, query, chat_history, generated_answer, conversation_id, user_id
163170
FROM user_interactions
164171
{where_clause}
165172
ORDER BY created_at DESC
@@ -200,18 +207,20 @@ async def migrate_user_interaction(interaction: UserInteraction) -> tuple[bool,
200207
agent_id,
201208
mcp_mode,
202209
conversation_id,
210+
user_id,
203211
chat_history,
204212
query,
205213
generated_answer,
206214
retrieved_sources,
207215
llm_usage
208216
)
209-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
217+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
210218
ON CONFLICT (id) DO UPDATE SET
211219
created_at = EXCLUDED.created_at,
212220
agent_id = EXCLUDED.agent_id,
213221
mcp_mode = EXCLUDED.mcp_mode,
214222
conversation_id = EXCLUDED.conversation_id,
223+
user_id = EXCLUDED.user_id,
215224
chat_history = EXCLUDED.chat_history,
216225
query = EXCLUDED.query,
217226
generated_answer = EXCLUDED.generated_answer,
@@ -224,6 +233,7 @@ async def migrate_user_interaction(interaction: UserInteraction) -> tuple[bool,
224233
interaction.agent_id,
225234
interaction.mcp_mode,
226235
interaction.conversation_id,
236+
interaction.user_id,
227237
_serialize_json_field(interaction.chat_history),
228238
interaction.query,
229239
interaction.generated_answer,

python/src/cairo_coder/db/session.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ async def execute_schema_scripts() -> None:
7272
agent_id VARCHAR(50) NOT NULL,
7373
mcp_mode BOOLEAN NOT NULL DEFAULT FALSE,
7474
conversation_id VARCHAR(100),
75+
user_id VARCHAR(100),
7576
chat_history JSONB,
7677
query TEXT NOT NULL,
7778
generated_answer TEXT,
@@ -87,6 +88,13 @@ async def execute_schema_scripts() -> None:
8788
ADD COLUMN IF NOT EXISTS conversation_id VARCHAR(100);
8889
"""
8990
)
91+
# Migration: add user_id column if it doesn't exist (for existing tables)
92+
await connection.execute(
93+
"""
94+
ALTER TABLE user_interactions
95+
ADD COLUMN IF NOT EXISTS user_id VARCHAR(100);
96+
"""
97+
)
9098
await connection.execute(
9199
"""
92100
CREATE INDEX IF NOT EXISTS idx_interactions_created_at
@@ -95,6 +103,8 @@ async def execute_schema_scripts() -> None:
95103
ON user_interactions(agent_id);
96104
CREATE INDEX IF NOT EXISTS idx_interactions_conversation_id
97105
ON user_interactions(conversation_id);
106+
CREATE INDEX IF NOT EXISTS idx_interactions_user_id
107+
ON user_interactions(user_id);
98108
"""
99109
)
100110
logger.info("Database schema initialized.")

python/src/cairo_coder/server/app.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import argparse
9+
import hashlib
910
import json
1011
import os
1112
import time
@@ -40,6 +41,24 @@
4041
setup_logging(os.environ.get("LOG_LEVEL", "INFO"), os.environ.get("LOG_FORMAT", "console"))
4142
logger = structlog.get_logger(__name__)
4243

44+
45+
def hash_user_id(user_id: str | None) -> str | None:
46+
"""
47+
Hash a user ID for anonymization.
48+
49+
Uses SHA-256 to create a consistent, anonymized identifier from the raw user ID.
50+
This ensures privacy while maintaining the ability to track user behavior.
51+
52+
Args:
53+
user_id: Raw user ID from header (UUID or API key)
54+
55+
Returns:
56+
Hashed user ID (first 32 chars of SHA-256 hex digest) or None if no input
57+
"""
58+
if user_id is None:
59+
return None
60+
return hashlib.sha256(user_id.encode()).hexdigest()[:32]
61+
4362
# Global vector DB instance managed by FastAPI lifecycle
4463
_vector_db: SourceFilteredPgVectorRM | None = None
4564
_agent_factory: AgentFactory | None = None
@@ -152,6 +171,7 @@ async def log_interaction_task(
152171
response: ChatCompletionResponse,
153172
agent: RagPipeline,
154173
conversation_id: str | None = None,
174+
user_id: str | None = None,
155175
) -> None:
156176
"""Background task that persists a user interaction."""
157177
sources_data = [
@@ -169,6 +189,7 @@ async def log_interaction_task(
169189
agent_id=agent_id,
170190
mcp_mode=mcp_mode,
171191
conversation_id=conversation_id,
192+
user_id=user_id,
172193
chat_history=chat_history_dicts,
173194
query=query,
174195
generated_answer=response.choices[0].message.content if response.choices else None,
@@ -186,6 +207,7 @@ async def log_interaction_raw(
186207
generated_answer: str | None,
187208
agent: RagPipeline,
188209
conversation_id: str | None = None,
210+
user_id: str | None = None,
189211
) -> None:
190212
"""Persist a user interaction without constructing a full response object."""
191213
sources_data = [
@@ -202,6 +224,7 @@ async def log_interaction_raw(
202224
agent_id=agent_id,
203225
mcp_mode=mcp_mode,
204226
conversation_id=conversation_id,
227+
user_id=user_id,
205228
chat_history=chat_history_dicts,
206229
query=query,
207230
generated_answer=generated_answer,
@@ -429,6 +452,11 @@ async def _handle_chat_completion(
429452
"""Handle chat completion request."""
430453
# Extract conversation ID from header
431454
conversation_id = req.headers.get("x-conversation-id")
455+
# Extract user identifier for anonymized tracking
456+
# Priority: 1) x-api-key (for authenticated users), 2) x-user-id (for frontend users)
457+
api_key = req.headers.get("x-api-key")
458+
raw_user_id = req.headers.get("x-user-id")
459+
user_id = hash_user_id(api_key) if api_key else hash_user_id(raw_user_id)
432460

433461
# Convert messages to internal format
434462
messages = []
@@ -451,7 +479,7 @@ async def _handle_chat_completion(
451479
if request.stream:
452480
return StreamingResponse(
453481
self._stream_chat_completion(
454-
agent, query, messages[:-1], mcp_mode, effective_agent_id, conversation_id
482+
agent, query, messages[:-1], mcp_mode, effective_agent_id, conversation_id, user_id
455483
),
456484
media_type="text/event-stream",
457485
headers={
@@ -472,6 +500,7 @@ async def _handle_chat_completion(
472500
response=response,
473501
agent=agent,
474502
conversation_id=conversation_id,
503+
user_id=user_id,
475504
)
476505

477506
return response
@@ -484,6 +513,7 @@ async def _stream_chat_completion(
484513
mcp_mode: bool,
485514
agent_id: str,
486515
conversation_id: str | None = None,
516+
user_id: str | None = None,
487517
) -> AsyncGenerator[str, None]:
488518
"""Stream chat completion response - replicates TypeScript streaming."""
489519
response_id = str(uuid.uuid4())
@@ -597,6 +627,7 @@ async def _stream_chat_completion(
597627
generated_answer=final_response,
598628
agent=agent,
599629
conversation_id=conversation_id,
630+
user_id=user_id,
600631
)
601632
except Exception as log_error:
602633
logger.error("Failed to log streaming interaction", error=str(log_error), exc_info=True)

python/src/cairo_coder/server/insights_api.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class QueryResponse(BaseModel):
2929
chat_history: list[dict[str, Any]]
3030
output: str | None
3131
conversation_id: str | None = None
32+
user_id: str | None = None
3233

3334

3435
class PaginatedQueryResponse(BaseModel):
@@ -47,6 +48,7 @@ async def get_raw_queries(
4748
agent_id: str | None = None,
4849
query_text: str | None = None,
4950
conversation_id: str | None = None,
51+
user_id: str | None = None,
5052
limit: int = 100,
5153
offset: int = 0,
5254
) -> PaginatedQueryResponse:
@@ -56,9 +58,10 @@ async def get_raw_queries(
5658
ordered by creation time (where N is the limit parameter).
5759
5860
Use conversation_id to filter queries belonging to a specific conversation.
61+
Use user_id to filter queries belonging to a specific user.
5962
"""
6063
items, total = await get_interactions(
61-
start_date, end_date, agent_id, limit, offset, query_text, conversation_id
64+
start_date, end_date, agent_id, limit, offset, query_text, conversation_id, user_id
6265
)
6366
# Map generated_answer to output for API response
6467
responses = [
@@ -70,6 +73,7 @@ async def get_raw_queries(
7073
chat_history=item["chat_history"] or [],
7174
output=item.get("generated_answer"),
7275
conversation_id=item.get("conversation_id"),
76+
user_id=item.get("user_id"),
7377
)
7478
for item in items
7579
]

python/tests/integration/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,16 @@ async def test_db_pool(postgres_container):
165165
"""Asyncpg pool connected to the ephemeral Postgres.
166166
167167
Creates schema directly to avoid cross-loop pool reuse with the app.
168+
Clears global pools dict before creating a new pool to prevent connection exhaustion.
168169
"""
169170
import asyncpg # local import to avoid import at collection when skipped
170171

172+
from cairo_coder.db import session as db_session
173+
174+
# Clear any lingering pools from the global dict to prevent connection exhaustion
175+
# during long test runs
176+
await db_session.close_pool()
177+
171178
raw_dsn = postgres_container.get_connection_url()
172179
# Convert SQLAlchemy-style DSN to asyncpg-compatible DSN
173180
dsn = raw_dsn.replace("postgresql+psycopg2", "postgresql")
@@ -183,6 +190,7 @@ async def test_db_pool(postgres_container):
183190
agent_id VARCHAR(50) NOT NULL,
184191
mcp_mode BOOLEAN NOT NULL DEFAULT FALSE,
185192
conversation_id VARCHAR(100),
193+
user_id VARCHAR(100),
186194
chat_history JSONB,
187195
query TEXT NOT NULL,
188196
generated_answer TEXT,
@@ -199,6 +207,8 @@ async def test_db_pool(postgres_container):
199207
ON user_interactions(agent_id);
200208
CREATE INDEX IF NOT EXISTS idx_interactions_conversation_id
201209
ON user_interactions(conversation_id);
210+
CREATE INDEX IF NOT EXISTS idx_interactions_user_id
211+
ON user_interactions(user_id);
202212
"""
203213
)
204214

0 commit comments

Comments
 (0)