Skip to content

Commit 93da0ce

Browse files
committed
Trace reasoning messages in Langfuse
1 parent 86dd619 commit 93da0ce

2 files changed

Lines changed: 184 additions & 8 deletions

File tree

src/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type SessionNextEvent =
5858
properties: {
5959
sessionID: string;
6060
timestamp: number;
61+
assistantMessageID: string;
6162
reasoningID: string;
6263
text: string;
6364
};
@@ -165,6 +166,7 @@ const eventHook = (event: OpencodeEvent, shutdown?: () => Promise<void>) =>
165166

166167
if (event.type === "message.part.updated") {
167168
langfuse.rememberAssistantPart(event.properties.part);
169+
langfuse.traceReasoningPart(event.properties.part);
168170
}
169171

170172
if (event.type === "session.next.step.started") {
@@ -200,15 +202,13 @@ const eventHook = (event: OpencodeEvent, shutdown?: () => Promise<void>) =>
200202
}
201203

202204
if (event.type === "session.next.reasoning.ended") {
203-
langfuse.traceEvent({
204-
id: event.id,
205+
langfuse.traceReasoning({
206+
reasoningID: event.properties.reasoningID,
205207
sessionID: event.properties.sessionID,
206-
name: "opencode.generation.reasoning",
207208
timestamp: event.properties.timestamp,
208-
output: { text: event.properties.text },
209-
metadata: {
210-
reasoningID: event.properties.reasoningID,
211-
},
209+
text: event.properties.text,
210+
messageID: event.properties.assistantMessageID,
211+
source: "session.next.reasoning.ended",
212212
});
213213
}
214214

src/langfuse.ts

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ export class LangfuseClient {
3434
this.traceState.assistantParts.clear();
3535
this.traceState.abortedSessions.clear();
3636
this.traceState.tracedEventIds.clear();
37+
this.traceState.tracedReasoningIds.clear();
38+
this.traceState.pendingReasoningPartsByMessageId.clear();
39+
this.traceState.generationSpansByMessageId.clear();
3740
this.traceState.generationParentSpans.clear();
3841
this.traceState.turnObservationsByMessageId.clear();
3942
this.traceState.latestTurnObservationsBySession.clear();
@@ -103,14 +106,15 @@ export class LangfuseClient {
103106
input?: unknown;
104107
output?: unknown;
105108
metadata?: unknown;
109+
parentSpan?: ApiSpan;
106110
}) {
107111
if (this.traceState.tracedEventIds.has(input.id)) {
108112
return;
109113
}
110114

111115
this.traceState.tracedEventIds.add(input.id);
112116

113-
this.withObservationParent(input.sessionID, () => {
117+
const startEvent = () => {
114118
const span = this.traceState.tracer.startSpan(input.name, {
115119
attributes: {
116120
"langfuse.observation.type": "event",
@@ -127,6 +131,98 @@ export class LangfuseClient {
127131
});
128132

129133
span.end(new Date(input.timestamp));
134+
};
135+
136+
if (input.parentSpan) {
137+
context.with(
138+
trace.setSpan(context.active(), input.parentSpan),
139+
startEvent,
140+
);
141+
return;
142+
}
143+
144+
this.withObservationParent(input.sessionID, startEvent);
145+
}
146+
147+
traceReasoning(input: {
148+
reasoningID: string;
149+
sessionID: string;
150+
timestamp: number;
151+
text: string;
152+
messageID?: string;
153+
source: string;
154+
parentSpan?: ApiSpan;
155+
}) {
156+
if (!input.text.trim()) {
157+
return;
158+
}
159+
160+
const reasoningTraceKey = `${input.sessionID}:${input.reasoningID}`;
161+
162+
if (this.traceState.tracedReasoningIds.has(reasoningTraceKey)) {
163+
return;
164+
}
165+
166+
this.traceState.tracedReasoningIds.add(reasoningTraceKey);
167+
168+
const parentSpan =
169+
input.parentSpan ??
170+
(input.messageID
171+
? this.traceState.generationSpansByMessageId.get(input.messageID)
172+
: undefined);
173+
174+
const generationParentSpan =
175+
parentSpan ??
176+
this.traceState.activeGenerationSteps.get(input.sessionID)?.span ??
177+
this.traceState.generationParentSpans.get(input.sessionID);
178+
179+
this.traceEvent({
180+
id: `reasoning:${reasoningTraceKey}`,
181+
sessionID: input.sessionID,
182+
name: "opencode.generation.reasoning",
183+
timestamp: input.timestamp,
184+
output: { text: input.text },
185+
metadata: {
186+
reasoningID: input.reasoningID,
187+
messageID: input.messageID,
188+
source: input.source,
189+
},
190+
parentSpan: generationParentSpan,
191+
});
192+
}
193+
194+
traceReasoningPart(part: MessagePart) {
195+
const completed = getCompletedReasoningTimestamp(part);
196+
197+
if (!isCompletedReasoningPart(part) || completed === undefined) {
198+
return;
199+
}
200+
201+
const generationSpan =
202+
this.traceState.generationSpansByMessageId.get(part.messageID) ??
203+
this.traceState.activeGenerationSteps.get(part.sessionID)?.span ??
204+
this.traceState.generationParentSpans.get(part.sessionID);
205+
206+
if (!generationSpan) {
207+
const pending =
208+
this.traceState.pendingReasoningPartsByMessageId.get(part.messageID) ??
209+
new Map<string, CompletedReasoningPart>();
210+
pending.set(part.id, part);
211+
this.traceState.pendingReasoningPartsByMessageId.set(
212+
part.messageID,
213+
pending,
214+
);
215+
return;
216+
}
217+
218+
this.traceReasoning({
219+
reasoningID: part.id,
220+
sessionID: part.sessionID,
221+
timestamp: completed,
222+
text: part.text,
223+
messageID: part.messageID,
224+
source: "message.part.updated",
225+
parentSpan: generationSpan,
130226
});
131227
}
132228

@@ -415,6 +511,12 @@ export class LangfuseClient {
415511
}),
416512
);
417513

514+
this.traceState.generationSpansByMessageId.set(
515+
input.messageID,
516+
step.span,
517+
);
518+
this.flushPendingReasoning(input.messageID, step.span);
519+
418520
step.span.end(new Date(input.completed));
419521
this.traceState.activeGenerationSteps.delete(input.sessionID);
420522

@@ -458,10 +560,37 @@ export class LangfuseClient {
458560
});
459561

460562
this.traceState.generationParentSpans.set(input.sessionID, span);
563+
this.traceState.generationSpansByMessageId.set(input.messageID, span);
564+
this.flushPendingReasoning(input.messageID, span);
461565
span.end(new Date(input.completed));
462566
});
463567
}
464568

569+
private flushPendingReasoning(messageID: string, parentSpan: ApiSpan) {
570+
const pending =
571+
this.traceState.pendingReasoningPartsByMessageId.get(messageID) ??
572+
new Map<string, CompletedReasoningPart>();
573+
this.traceState.pendingReasoningPartsByMessageId.delete(messageID);
574+
575+
for (const part of pending.values()) {
576+
const completed = getCompletedReasoningTimestamp(part);
577+
578+
if (completed === undefined) {
579+
continue;
580+
}
581+
582+
this.traceReasoning({
583+
reasoningID: part.id,
584+
sessionID: part.sessionID,
585+
timestamp: completed,
586+
text: part.text,
587+
messageID: part.messageID,
588+
source: "message.part.updated",
589+
parentSpan,
590+
});
591+
}
592+
}
593+
465594
traceFailedGenerationStep(input: {
466595
id: string;
467596
sessionID: string;
@@ -736,6 +865,12 @@ export type LangfuseTraceState = {
736865
tracedMessageIds: Set<string>;
737866
tracedGenerationIds: Set<string>;
738867
tracedEventIds: Set<string>;
868+
tracedReasoningIds: Set<string>;
869+
pendingReasoningPartsByMessageId: Map<
870+
string,
871+
Map<string, CompletedReasoningPart>
872+
>;
873+
generationSpansByMessageId: Map<string, ApiSpan>;
739874
assistantParts: Map<string, Map<string, MessagePart>>;
740875
turnObservationsByMessageId: Map<string, TurnObservation>;
741876
latestTurnObservationsBySession: Map<string, TurnObservation>;
@@ -749,6 +884,41 @@ export type MessagePart = Extract<
749884
{ type: "message.part.updated" }
750885
>["properties"]["part"];
751886

887+
type CompletedReasoningPart = MessagePart & {
888+
id: string;
889+
sessionID: string;
890+
text: string;
891+
messageID: string;
892+
time: { completed?: number; end?: number };
893+
};
894+
895+
function isCompletedReasoningPart(
896+
part: MessagePart,
897+
): part is CompletedReasoningPart {
898+
return (
899+
part.type === "reasoning" &&
900+
typeof part.id === "string" &&
901+
typeof part.sessionID === "string" &&
902+
typeof part.messageID === "string" &&
903+
typeof part.text === "string" &&
904+
typeof getCompletedReasoningTimestamp(part) === "number"
905+
);
906+
}
907+
908+
function getCompletedReasoningTimestamp(part: MessagePart) {
909+
const time = (part as { time?: { completed?: unknown; end?: unknown } }).time;
910+
911+
if (typeof time?.completed === "number") {
912+
return time.completed;
913+
}
914+
915+
if (typeof time?.end === "number") {
916+
return time.end;
917+
}
918+
919+
return undefined;
920+
}
921+
752922
export type FormattedMessagePart =
753923
| { type: string; text: string }
754924
| { type: string; filename?: string; url?: string }
@@ -851,6 +1021,12 @@ export const createLangfuseClient = (input: {
8511021
tracedMessageIds: new Set<string>(),
8521022
tracedGenerationIds: new Set<string>(),
8531023
tracedEventIds: new Set<string>(),
1024+
tracedReasoningIds: new Set<string>(),
1025+
pendingReasoningPartsByMessageId: new Map<
1026+
string,
1027+
Map<string, CompletedReasoningPart>
1028+
>(),
1029+
generationSpansByMessageId: new Map<string, ApiSpan>(),
8541030
assistantParts: new Map<string, Map<string, MessagePart>>(),
8551031
turnObservationsByMessageId: new Map<string, TurnObservation>(),
8561032
latestTurnObservationsBySession: new Map<string, TurnObservation>(),

0 commit comments

Comments
 (0)