7
7
GEN_AI_RESPONSE_MODEL_ATTRIBUTE ,
8
8
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE ,
9
9
GEN_AI_RESPONSE_TEXT_ATTRIBUTE ,
10
+ GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE ,
10
11
} from '../ai/gen-ai-attributes' ;
11
12
import { setTokenUsageAttributes } from '../ai/utils' ;
12
13
import type { AnthropicAiStreamingEvent } from './types' ;
@@ -32,6 +33,17 @@ interface StreamingState {
32
33
cacheCreationInputTokens : number | undefined ;
33
34
/** Number of cache read input tokens used. */
34
35
cacheReadInputTokens : number | undefined ;
36
+ /** Accumulated tool calls (finalized) */
37
+ toolCalls : Array < Record < string , unknown > > ;
38
+ /** In-progress tool call blocks keyed by index */
39
+ activeToolBlocks : Record <
40
+ number ,
41
+ {
42
+ id ?: string ;
43
+ name ?: string ;
44
+ inputJsonParts : string [ ] ;
45
+ }
46
+ > ;
35
47
}
36
48
37
49
/**
@@ -43,12 +55,7 @@ interface StreamingState {
43
55
* @returns Whether an error occurred
44
56
*/
45
57
46
- function isErrorEvent (
47
- event : AnthropicAiStreamingEvent ,
48
- state : StreamingState ,
49
- recordOutputs : boolean ,
50
- span : Span ,
51
- ) : boolean {
58
+ function isErrorEvent ( event : AnthropicAiStreamingEvent , span : Span ) : boolean {
52
59
if ( 'type' in event && typeof event . type === 'string' ) {
53
60
// If the event is an error, set the span status and capture the error
54
61
// These error events are not rejected by the API by default, but are sent as metadata of the response
@@ -69,11 +76,6 @@ function isErrorEvent(
69
76
} ) ;
70
77
return true ;
71
78
}
72
-
73
- if ( recordOutputs && event . type === 'content_block_delta' ) {
74
- const text = event . delta ?. text ;
75
- if ( text ) state . responseTexts . push ( text ) ;
76
- }
77
79
}
78
80
return false ;
79
81
}
@@ -110,6 +112,66 @@ function handleMessageMetadata(event: AnthropicAiStreamingEvent, state: Streamin
110
112
}
111
113
}
112
114
115
+ /**
116
+ * Handle start of a content block (e.g., tool_use)
117
+ */
118
+ function handleContentBlockStart ( event : AnthropicAiStreamingEvent , state : StreamingState ) : void {
119
+ if ( event . type !== 'content_block_start' || typeof event . index !== 'number' || ! event . content_block ) return ;
120
+ if ( event . content_block . type === 'tool_use' ) {
121
+ state . activeToolBlocks [ event . index ] = {
122
+ id : event . content_block . id ,
123
+ name : event . content_block . name ,
124
+ inputJsonParts : [ ] ,
125
+ } ;
126
+ }
127
+ }
128
+
129
+ /**
130
+ * Handle deltas of a content block, including input_json_delta for tool_use
131
+ */
132
+ function handleContentBlockDelta (
133
+ event : AnthropicAiStreamingEvent ,
134
+ state : StreamingState ,
135
+ recordOutputs : boolean ,
136
+ ) : void {
137
+ if ( event . type !== 'content_block_delta' || typeof event . index !== 'number' || ! event . delta ) return ;
138
+
139
+ if ( 'partial_json' in event . delta && typeof event . delta . partial_json === 'string' ) {
140
+ const active = state . activeToolBlocks [ event . index ] ;
141
+ if ( active ) {
142
+ active . inputJsonParts . push ( event . delta . partial_json ) ;
143
+ }
144
+ }
145
+
146
+ if ( recordOutputs && event . delta . text ) {
147
+ state . responseTexts . push ( event . delta . text ) ;
148
+ }
149
+ }
150
+
151
+ /**
152
+ * Handle stop of a content block; finalize tool_use entries
153
+ */
154
+ function handleContentBlockStop ( event : AnthropicAiStreamingEvent , state : StreamingState ) : void {
155
+ if ( event . type !== 'content_block_stop' || typeof event . index !== 'number' ) return ;
156
+
157
+ const active = state . activeToolBlocks [ event . index ] ;
158
+ if ( ! active ) return ;
159
+
160
+ const raw = active . inputJsonParts . join ( '' ) ;
161
+ const parsedInput = raw ? JSON . parse ( raw ) : { } ;
162
+
163
+ state . toolCalls . push ( {
164
+ type : 'tool_use' ,
165
+ id : active . id ,
166
+ name : active . name ,
167
+ input : parsedInput ,
168
+ } ) ;
169
+
170
+ // Avoid deleting a dynamic key; rebuild the map without this index
171
+ const remainingEntries = Object . entries ( state . activeToolBlocks ) . filter ( ( [ key ] ) => key !== String ( event . index ) ) ;
172
+ state . activeToolBlocks = Object . fromEntries ( remainingEntries ) ;
173
+ }
174
+
113
175
/**
114
176
* Processes an event
115
177
* @param event - The event to process
@@ -128,10 +190,19 @@ function processEvent(
128
190
return ;
129
191
}
130
192
131
- const isError = isErrorEvent ( event , state , recordOutputs , span ) ;
193
+ const isError = isErrorEvent ( event , span ) ;
132
194
if ( isError ) return ;
133
195
134
196
handleMessageMetadata ( event , state ) ;
197
+
198
+ // Tool call events are sent via 3 separate events:
199
+ // - content_block_start (start of the tool call)
200
+ // - content_block_delta (delta aka input of the tool call)
201
+ // - content_block_stop (end of the tool call)
202
+ // We need to handle them all to capture the full tool call.
203
+ handleContentBlockStart ( event , state ) ;
204
+ handleContentBlockDelta ( event , state , recordOutputs ) ;
205
+ handleContentBlockStop ( event , state ) ;
135
206
}
136
207
137
208
/**
@@ -153,6 +224,8 @@ export async function* instrumentStream(
153
224
completionTokens : undefined ,
154
225
cacheCreationInputTokens : undefined ,
155
226
cacheReadInputTokens : undefined ,
227
+ toolCalls : [ ] ,
228
+ activeToolBlocks : { } ,
156
229
} ;
157
230
158
231
try {
@@ -197,6 +270,13 @@ export async function* instrumentStream(
197
270
} ) ;
198
271
}
199
272
273
+ // Set tool calls if any were captured
274
+ if ( recordOutputs && state . toolCalls . length > 0 ) {
275
+ span . setAttributes ( {
276
+ [ GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE ] : JSON . stringify ( state . toolCalls ) ,
277
+ } ) ;
278
+ }
279
+
200
280
span . end ( ) ;
201
281
}
202
282
}
0 commit comments