Skip to content

Commit 2ee689f

Browse files
Langfuse: Add comprehensive error handling and tests (#551)
* Langfuse: Add comprehensive error handling and tests - Added try-catch blocks to prevent Langfuse failures from blocking main flow - Ensure tracer is always initialized (disabled if no credentials) - Added graceful degradation for all tracer methods - Created comprehensive test suite covering all failure scenarios - Tests verify OpenAI responses work regardless of Langfuse state Fixes issue where Langfuse connection failures could block API responses. * Langfuse: Refactor error handling to use wrapper function Address PR review feedback - replace multiple try/catch blocks with single _langfuse_call wrapper that auto-disables on first failure. * Formatted the code and added the fixture for test_langfuse_tracer.py * made the loggers consistent --------- Co-authored-by: Akhilesh Negi <akhileshnegi.an3@gmail.com>
1 parent d491488 commit 2ee689f

File tree

4 files changed

+738
-64
lines changed

4 files changed

+738
-64
lines changed

backend/app/core/langfuse/langfuse.py

Lines changed: 116 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ def __init__(
1717
credentials: Optional[dict] = None,
1818
session_id: Optional[str] = None,
1919
response_id: Optional[str] = None,
20-
):
20+
) -> None:
2121
self.session_id = session_id or str(uuid.uuid4())
2222
self.langfuse: Optional[Langfuse] = None
2323
self.trace: Optional[StatefulTraceClient] = None
2424
self.generation: Optional[StatefulGenerationClient] = None
25+
self._failed = False
2526

2627
has_credentials = (
2728
credentials
@@ -31,38 +32,57 @@ def __init__(
3132
)
3233

3334
if has_credentials:
34-
self.langfuse = Langfuse(
35-
public_key=credentials["public_key"],
36-
secret_key=credentials["secret_key"],
37-
host=credentials["host"],
38-
enabled=True, # This ensures the client is active
39-
)
35+
try:
36+
self.langfuse = Langfuse(
37+
public_key=credentials["public_key"],
38+
secret_key=credentials["secret_key"],
39+
host=credentials["host"],
40+
enabled=True, # This ensures the client is active
41+
)
42+
except Exception as e:
43+
logger.warning(f"[LangfuseTracer] Failed to initialize: {e}")
44+
self._failed = True
45+
return
4046

4147
if response_id:
42-
traces = self.langfuse.fetch_traces(tags=response_id).data
43-
if traces:
44-
self.session_id = traces[0].session_id
48+
try:
49+
traces = self.langfuse.fetch_traces(tags=response_id).data
50+
if traces:
51+
self.session_id = traces[0].session_id
52+
except Exception as e:
53+
logger.debug(f"[LangfuseTracer] Session resume failed: {e}")
4554

4655
logger.info(
47-
f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}"
56+
f"[LangfuseTracer] Tracing enabled | session_id={self.session_id}"
4857
)
4958
else:
50-
self.langfuse = Langfuse(enabled=False)
59+
logger.warning("[LangfuseTracer] Tracing disabled - missing credentials")
60+
61+
def _langfuse_call(self, fn: Callable, *args: Any, **kwargs: Any) -> Any:
62+
if self._failed:
63+
return None
64+
try:
65+
return fn(*args, **kwargs)
66+
except Exception as e:
5167
logger.warning(
52-
"[LangfuseTracer] Langfuse tracing disabled due to missing credentials"
68+
f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}"
5369
)
70+
self._failed = True
71+
return None
5472

5573
def start_trace(
5674
self,
5775
name: str,
5876
input: Dict[str, Any],
5977
metadata: Optional[Dict[str, Any]] = None,
6078
tags: list[str] | None = None,
61-
):
79+
) -> None:
80+
if self._failed or not self.langfuse:
81+
return
6282
metadata = metadata or {}
6383
metadata["request_id"] = correlation_id.get() or "N/A"
64-
65-
self.trace = self.langfuse.trace(
84+
self.trace = self._langfuse_call(
85+
self.langfuse.trace,
6686
name=name,
6787
input=input,
6888
metadata=metadata,
@@ -75,10 +95,11 @@ def start_generation(
7595
name: str,
7696
input: Dict[str, Any],
7797
metadata: Optional[Dict[str, Any]] = None,
78-
):
79-
if not self.trace:
98+
) -> None:
99+
if self._failed or not self.trace:
80100
return
81-
self.generation = self.langfuse.generation(
101+
self.generation = self._langfuse_call(
102+
self.langfuse.generation,
82103
name=name,
83104
trace_id=self.trace.id,
84105
input=input,
@@ -90,31 +111,40 @@ def end_generation(
90111
output: Dict[str, Any],
91112
usage: Optional[Dict[str, Any]] = None,
92113
model: Optional[str] = None,
93-
):
94-
if self.generation:
95-
self.generation.end(output=output, usage=usage, model=model)
114+
) -> None:
115+
if self._failed or not self.generation:
116+
return
117+
self._langfuse_call(
118+
self.generation.end, output=output, usage=usage, model=model
119+
)
96120

97-
def update_trace(self, tags: list[str], output: Dict[str, Any]):
98-
if self.trace:
99-
self.trace.update(tags=tags, output=output)
121+
def update_trace(self, tags: list[str], output: Dict[str, Any]) -> None:
122+
if self._failed or not self.trace:
123+
return
124+
self._langfuse_call(self.trace.update, tags=tags, output=output)
100125

101-
def log_error(self, error_message: str, response_id: Optional[str] = None):
126+
def log_error(self, error_message: str, response_id: Optional[str] = None) -> None:
127+
if self._failed:
128+
return
102129
if self.generation:
103-
self.generation.end(output={"error": error_message})
130+
self._langfuse_call(self.generation.end, output={"error": error_message})
104131
if self.trace:
105-
self.trace.update(
132+
self._langfuse_call(
133+
self.trace.update,
106134
tags=[response_id] if response_id else [],
107135
output={"status": "failure", "error": error_message},
108136
)
109137

110-
def flush(self):
111-
self.langfuse.flush()
138+
def flush(self) -> None:
139+
if self._failed or not self.langfuse:
140+
return
141+
self._langfuse_call(self.langfuse.flush)
112142

113143

114144
def observe_llm_execution(
115145
session_id: str | None = None,
116146
credentials: dict | None = None,
117-
):
147+
) -> Callable:
118148
"""Decorator to add Langfuse observability to LLM provider execute methods.
119149
120150
Args:
@@ -135,7 +165,9 @@ def wrapper(
135165
):
136166
# Skip observability if no credentials provided
137167
if not credentials:
138-
logger.info("[Langfuse] No credentials - skipping observability")
168+
logger.info(
169+
"[observe_llm_execution] No credentials - skipping observability"
170+
)
139171
return func(completion_config, query, **kwargs)
140172

141173
try:
@@ -144,30 +176,56 @@ def wrapper(
144176
secret_key=credentials.get("secret_key"),
145177
host=credentials.get("host"),
146178
)
179+
logger.info(
180+
f"[observe_llm_execution] Tracing enabled | session_id={session_id or 'auto'}"
181+
)
147182
except Exception as e:
148-
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
183+
logger.warning(
184+
f"[observe_llm_execution] Failed to initialize client: {e}"
185+
)
149186
return func(completion_config, query, **kwargs)
150187

151-
trace = langfuse.trace(
188+
failed = False
189+
190+
def langfuse_call(fn, *args, **kwargs):
191+
"""Execute Langfuse operation safely. First failure disables further calls."""
192+
nonlocal failed
193+
if failed:
194+
return None
195+
try:
196+
return fn(*args, **kwargs)
197+
except Exception as e:
198+
logger.warning(
199+
f"[observe_llm_execution] {getattr(fn, '__name__', 'operation')} failed: {e}"
200+
)
201+
failed = True
202+
return None
203+
204+
trace = langfuse_call(
205+
langfuse.trace,
152206
name="unified-llm-call",
153207
input=query.input,
154208
tags=[completion_config.provider],
155209
)
156210

157-
generation = trace.generation(
158-
name=f"{completion_config.provider}-completion",
159-
input=query.input,
160-
model=completion_config.params.get("model"),
161-
)
211+
generation = None
212+
if trace:
213+
generation = langfuse_call(
214+
trace.generation,
215+
name=f"{completion_config.provider}-completion",
216+
input=query.input,
217+
model=completion_config.params.get("model"),
218+
)
162219

163-
try:
164-
# Execute the actual LLM call
165-
response: LLMCallResponse | None
166-
error: str | None
167-
response, error = func(completion_config, query, **kwargs)
220+
# Execute the actual LLM call
221+
response: LLMCallResponse | None
222+
error: str | None
223+
response, error = func(completion_config, query, **kwargs)
168224

169-
if response:
170-
generation.end(
225+
if response:
226+
if generation:
227+
langfuse_call(
228+
generation.end,
171229
output={
172230
"status": "success",
173231
"output": response.response.output.text,
@@ -178,34 +236,28 @@ def wrapper(
178236
},
179237
model=response.response.model,
180238
)
181-
182-
trace.update(
239+
if trace:
240+
langfuse_call(
241+
trace.update,
183242
output={
184243
"status": "success",
185244
"output": response.response.output.text,
186245
},
187246
session_id=session_id or response.response.conversation_id,
188247
)
189-
else:
190-
error_msg = error or "Unknown error"
191-
generation.end(output={"error": error_msg})
192-
trace.update(
248+
else:
249+
error_msg = error or "Unknown error"
250+
if generation:
251+
langfuse_call(generation.end, output={"error": error_msg})
252+
if trace:
253+
langfuse_call(
254+
trace.update,
193255
output={"status": "failure", "error": error_msg},
194256
session_id=session_id,
195257
)
196258

197-
langfuse.flush()
198-
return response, error
199-
200-
except Exception as e:
201-
error_msg = str(e)
202-
generation.end(output={"error": error_msg})
203-
trace.update(
204-
output={"status": "failure", "error": error_msg},
205-
session_id=session_id,
206-
)
207-
langfuse.flush()
208-
raise
259+
langfuse_call(langfuse.flush)
260+
return response, error
209261

210262
return wrapper
211263

backend/app/tests/core/test_langfuse/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""
2+
Local conftest for LangfuseTracer unit tests.
3+
4+
These tests don't require database access, so we override the session-scoped
5+
seed_baseline fixture to skip database seeding.
6+
"""
7+
8+
import pytest
9+
10+
11+
@pytest.fixture(scope="session", autouse=True)
12+
def seed_baseline():
13+
"""Override the global seed_baseline fixture to skip database seeding."""
14+
yield

0 commit comments

Comments
 (0)