-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
290 lines (246 loc) · 8.4 KB
/
api.py
File metadata and controls
290 lines (246 loc) · 8.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
Flask REST API for Dynamic CSV Agent
This module provides the HTTP endpoint for the frontend to communicate with
the dynamic CSV processing agent.
"""
import asyncio
import logging
import os
import time
from flask import Flask, request, jsonify
from flask_cors import CORS
from dynamic_agent import process_row, get_agent_status as _get_agent_status
from config import get_config
# Initialize Flask app
app = Flask(__name__)
# Security: Limit request size to 16MB
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# Enable CORS for frontend communication
# Restrict origins in production via ALLOWED_ORIGINS environment variable
allowed_origins = os.environ.get('ALLOWED_ORIGINS', 'http://localhost:3000').split(',')
CORS(app, resources={
r"/api/*": {
"origins": allowed_origins,
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "X-API-Key"]
},
r"/health": {
"origins": "*",
"methods": ["GET"]
}
})
@app.before_request
def check_api_key():
"""
Middleware to validate API key for /api/* endpoints.
Skips validation if API_SECRET_KEY is not set (development mode).
"""
# Skip for non-API routes and OPTIONS requests (CORS preflight)
if not request.path.startswith('/api/') or request.method == 'OPTIONS':
return None
expected_key = os.environ.get('API_SECRET_KEY')
# If no API key is configured, allow all requests (dev mode)
if not expected_key:
return None
provided_key = request.headers.get('X-API-Key')
if provided_key != expected_key:
return jsonify({
'status': 'error',
'error': 'Unauthorized',
'details': 'Invalid or missing API key'
}), 401
# Load config first (needed for logging setup)
config = get_config()
# Setup logging with DEBUG_MODE support
logging.basicConfig(
level=logging.INFO if not config.debug_mode else logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Log startup configuration
config.log_config_summary(logger)
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring and load balancers."""
status = _get_agent_status()
return jsonify({
'status': 'healthy',
'service': 'knowledge-robot',
'version': '1.1.0',
'agent_ready': status['litellm_model_initialized'],
'model': status['model'],
})
@app.route('/api/process-row', methods=['POST'])
def process_csv_row():
"""
Process a single CSV row with the dynamic agent.
Request Body (JSON):
{
"row_data": {
"customer_name": "John Smith",
"product": "Laptop X1",
"review": "Great product! Very fast.",
"rating": 5
},
"prompt": "Analyze {customer_name}'s review of {product}: '{review}'. Consider the rating of {rating}.",
"output_schema": [
{
"name": "sentiment",
"type": "text",
"description": "Overall sentiment (positive, negative, neutral)"
},
{
"name": "key_points",
"type": "text",
"description": "Main points from the review"
},
{
"name": "matches_rating",
"type": "boolean",
"description": "Does the sentiment match the rating?"
}
],
"enable_search": false // Optional: Enable firecrawl_search tool (default: false)
}
Response (JSON):
{
"output": {
"sentiment": "positive",
"key_points": "Fast performance, great quality",
"matches_rating": true,
"_processed_at": "2025-10-04T12:34:56.789Z"
},
"metadata": {
"processing_time_ms": 5432,
"row_data_received": true,
"schema_fields_count": 3
}
}
Error Response:
{
"status": "error",
"error": "Error message here",
"details": "Additional error details"
}
"""
start_time = time.time()
try:
# Parse request JSON
data = request.get_json()
if not data:
logger.error("No JSON data provided in request")
return jsonify({
'status': 'error',
'error': 'No JSON data provided'
}), 400
# Extract request parameters
row_data = data.get('row_data', {})
prompt = data.get('prompt', '')
output_schema = data.get('output_schema', [])
scrape_backend = data.get('scrape_backend', 'local')
browser_visible = bool(data.get('browser_visible', False))
enable_search = bool(data.get('enable_search', False))
if scrape_backend not in ('firecrawl', 'local'):
return jsonify({
'status': 'error',
'error': f'Invalid scrape_backend: {scrape_backend!r}. Must be "local" or "firecrawl".',
}), 400
# Validate required fields
if not row_data:
logger.error("row_data is required")
return jsonify({
'status': 'error',
'error': 'row_data is required'
}), 400
if not prompt:
logger.error("prompt is required")
return jsonify({
'status': 'error',
'error': 'prompt is required'
}), 400
if not output_schema:
logger.error("output_schema is required")
return jsonify({
'status': 'error',
'error': 'output_schema is required'
}), 400
logger.info(
"Processing CSV row: cols=%d, output_fields=%d, backend=%s, visible=%s, search=%s",
len(row_data), len(output_schema), scrape_backend, browser_visible, enable_search,
)
# Process row (run async function in sync context).
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
process_row(
row_data, prompt, output_schema,
scrape_backend=scrape_backend,
browser_visible=browser_visible,
enable_search=enable_search,
)
)
finally:
loop.close()
# Calculate processing time
processing_time = int((time.time() - start_time) * 1000)
logger.info(f"Successfully processed CSV row in {processing_time}ms")
return jsonify({
'output': result,
'metadata': {
'processing_time_ms': processing_time,
'row_data_received': True,
'schema_fields_count': len(output_schema)
}
})
except ValueError as e:
# Validation errors
logger.error(f"Validation error: {e}", exc_info=True)
return jsonify({
'status': 'error',
'error': 'Validation error',
'details': str(e)
}), 400
except Exception as e:
# General errors
logger.error(f"Error processing request: {e}", exc_info=True)
return jsonify({
'status': 'error',
'error': 'Internal server error',
'details': str(e)
}), 500
@app.route('/api/agent-status', methods=['GET'])
def agent_status_endpoint():
"""Get the current status of the agent (model, capabilities, etc.)."""
try:
return jsonify(_get_agent_status())
except Exception as e:
logger.error(f"Error getting agent status: {e}", exc_info=True)
return jsonify({
'status': 'error',
'error': str(e),
}), 500
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({
'status': 'error',
'error': 'Endpoint not found'
}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
logger.error(f"Internal server error: {error}", exc_info=True)
return jsonify({
'status': 'error',
'error': 'Internal server error'
}), 500
# Development server (not used in production with gunicorn)
if __name__ == '__main__':
logger.info("Starting Knowledge Robot API in development mode...")
config.log_config_summary(logger)
app.run(
host='0.0.0.0',
port=8080,
debug=config.debug_mode
)