Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions kiro/routes_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
)
from kiro.http_client import KiroHttpClient
from kiro.utils import generate_conversation_id
from kiro.tokenizer import count_tools_tokens
from kiro.tokenizer import count_tokens

# Import debug_logger
try:
Expand Down Expand Up @@ -298,10 +298,11 @@ async def messages(
shared_client = request.app.state.http_client
http_client = KiroHttpClient(auth_manager, shared_client=shared_client)

# Prepare data for token counting
# Convert Pydantic models to dicts for tokenizer
messages_for_tokenizer = [msg.model_dump() for msg in request_data.messages]
tools_for_tokenizer = [tool.model_dump() for tool in request_data.tools] if request_data.tools else None
# Count prompt tokens from the full Kiro payload (system prompt + messages + tools)
kiro_payload_prompt_tokens = count_tokens(
kiro_request_body.decode('utf-8', errors='ignore'),
apply_claude_correction=False
)

try:
# Make request to Kiro API (for both streaming and non-streaming modes)
Expand Down Expand Up @@ -368,7 +369,7 @@ async def stream_wrapper():
request_data.model,
model_cache,
auth_manager,
request_messages=messages_for_tokenizer
prompt_tokens=kiro_payload_prompt_tokens
):
yield chunk
except GeneratorExit:
Expand Down Expand Up @@ -415,7 +416,7 @@ async def stream_wrapper():
request_data.model,
model_cache,
auth_manager,
request_messages=messages_for_tokenizer
prompt_tokens=kiro_payload_prompt_tokens
)

await http_client.close()
Expand Down
17 changes: 9 additions & 8 deletions kiro/routes_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from kiro.streaming_openai import stream_kiro_to_openai, collect_stream_response, stream_with_first_token_retry
from kiro.http_client import KiroHttpClient
from kiro.utils import generate_conversation_id
from kiro.tokenizer import count_tokens

# Import debug_logger
try:
Expand Down Expand Up @@ -324,10 +325,12 @@ async def chat_completions(request: Request, request_data: ChatCompletionRequest
}
)

# Prepare data for fallback token counting
# Convert Pydantic models to dicts for tokenizer
messages_for_tokenizer = [msg.model_dump() for msg in request_data.messages]
tools_for_tokenizer = [tool.model_dump() for tool in request_data.tools] if request_data.tools else None
# Count prompt tokens from the full Kiro payload (system prompt + messages + tools)
# This matches what actually gets sent to the API, giving accurate token counts
kiro_payload_prompt_tokens = count_tokens(
kiro_request_body.decode('utf-8', errors='ignore'),
apply_claude_correction=False
)

if request_data.stream:
# Streaming mode
Expand All @@ -341,8 +344,7 @@ async def stream_wrapper():
request_data.model,
model_cache,
auth_manager,
request_messages=messages_for_tokenizer,
request_tools=tools_for_tokenizer
prompt_tokens=kiro_payload_prompt_tokens
):
yield chunk
except GeneratorExit:
Expand Down Expand Up @@ -387,8 +389,7 @@ async def stream_wrapper():
request_data.model,
model_cache,
auth_manager,
request_messages=messages_for_tokenizer,
request_tools=tools_for_tokenizer
prompt_tokens=kiro_payload_prompt_tokens
)

await http_client.close()
Expand Down
75 changes: 29 additions & 46 deletions kiro/streaming_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@
collect_stream_to_result,
FirstTokenTimeoutError,
KiroEvent,
calculate_tokens_from_context_usage,
stream_with_first_token_retry,
)
from kiro.tokenizer import count_tokens, count_message_tokens, count_tools_tokens
from kiro.tokenizer import count_tokens
from kiro.parsers import parse_bracket_tool_calls, deduplicate_tool_calls
from kiro.config import FIRST_TOKEN_TIMEOUT, FIRST_TOKEN_MAX_RETRIES, FAKE_REASONING_HANDLING

Expand Down Expand Up @@ -104,40 +103,36 @@ async def stream_kiro_to_anthropic(
model_cache: "ModelInfoCache",
auth_manager: "KiroAuthManager",
first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
request_messages: Optional[list] = None,
prompt_tokens: int = 0,
conversation_id: Optional[str] = None
) -> AsyncGenerator[str, None]:
"""
Generator for converting Kiro stream to Anthropic SSE format.

Parses Kiro AWS SSE stream and converts events to Anthropic format.
Supports thinking content blocks when FAKE_REASONING_HANDLING=as_reasoning_content.

Args:
response: HTTP response with data stream
model: Model name to include in response
model_cache: Model cache for getting token limits
auth_manager: Authentication manager
first_token_timeout: First token wait timeout (seconds)
request_messages: Original request messages (for token counting)
prompt_tokens: Pre-counted prompt tokens (from full Kiro payload)
conversation_id: Stable conversation ID for truncation recovery (optional)

Yields:
Strings in Anthropic SSE format

Raises:
FirstTokenTimeoutError: If first token not received within timeout
"""
message_id = generate_message_id()
input_tokens = 0
input_tokens = prompt_tokens
output_tokens = 0
full_content = ""
full_thinking_content = ""

# Count input tokens from request messages
if request_messages:
input_tokens = count_message_tokens(request_messages, apply_claude_correction=False)

# Track content blocks - thinking block is index 0, text block is index 1 (when thinking enabled)
current_block_index = 0
thinking_block_started = False
Expand Down Expand Up @@ -456,13 +451,9 @@ async def stream_kiro_to_anthropic(

# Calculate output tokens
output_tokens = count_tokens(full_content + full_thinking_content)

# Calculate total tokens from context usage if available
if context_usage_percentage is not None:
prompt_tokens, total_tokens, _, _ = calculate_tokens_from_context_usage(
context_usage_percentage, output_tokens, model_cache, model
)
input_tokens = prompt_tokens

# input_tokens already set from pre-counted prompt_tokens (full Kiro payload).
# Don't override with contextUsagePercentage — it's unreliable.

# Determine stop reason
stop_reason = "tool_use" if tool_blocks else "end_turn"
Expand Down Expand Up @@ -545,29 +536,27 @@ async def collect_anthropic_response(
model: str,
model_cache: "ModelInfoCache",
auth_manager: "KiroAuthManager",
request_messages: Optional[list] = None
prompt_tokens: int = 0
) -> dict:
"""
Collect full response from Kiro stream in Anthropic format.

Used for non-streaming mode.

Args:
response: HTTP response with stream
model: Model name
model_cache: Model cache
auth_manager: Authentication manager
request_messages: Original request messages (for token counting)
prompt_tokens: Pre-counted prompt tokens (from full Kiro payload)

Returns:
Dictionary with full response in Anthropic Messages format
"""
message_id = generate_message_id()

# Count input tokens
input_tokens = 0
if request_messages:
input_tokens = count_message_tokens(request_messages, apply_claude_correction=False)

# Use pre-counted prompt tokens
input_tokens = prompt_tokens

# Collect stream result
result = await collect_stream_to_result(response)
Expand Down Expand Up @@ -617,12 +606,8 @@ async def collect_anthropic_response(
# Calculate output tokens
output_tokens = count_tokens(result.content + result.thinking_content)

# Calculate from context usage if available
if result.context_usage_percentage is not None:
prompt_tokens, _, _, _ = calculate_tokens_from_context_usage(
result.context_usage_percentage, output_tokens, model_cache, model
)
input_tokens = prompt_tokens
# input_tokens already set from pre-counted prompt_tokens (full Kiro payload).
# Don't override with contextUsagePercentage — it's unreliable.

# Determine stop reason
stop_reason = "tool_use" if result.tool_calls else "end_turn"
Expand Down Expand Up @@ -655,31 +640,29 @@ async def stream_with_first_token_retry_anthropic(
auth_manager: "KiroAuthManager",
max_retries: int = FIRST_TOKEN_MAX_RETRIES,
first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
request_messages: Optional[list] = None,
request_tools: Optional[list] = None
prompt_tokens: int = 0
) -> AsyncGenerator[str, None]:
"""
Streaming with automatic retry on first token timeout for Anthropic API.

If model doesn't respond within first_token_timeout seconds,
request is cancelled and a new one is made. Maximum max_retries attempts.

This is seamless for user - they just see a delay,
but eventually get a response (or error after all attempts).

Args:
make_request: Function to create new HTTP request
model: Model name
model_cache: Model cache
auth_manager: Authentication manager
max_retries: Maximum number of attempts
first_token_timeout: First token wait timeout (seconds)
request_messages: Original request messages (for fallback token counting)
request_tools: Original request tools (for fallback token counting)

prompt_tokens: Pre-counted prompt tokens (from full Kiro payload)

Yields:
Strings in Anthropic SSE format

Raises:
Exception with Anthropic error format after exhausting all attempts
"""
Expand Down Expand Up @@ -711,7 +694,7 @@ async def stream_processor(response: httpx.Response) -> AsyncGenerator[str, None
model_cache,
auth_manager,
first_token_timeout=first_token_timeout,
request_messages=request_messages
prompt_tokens=prompt_tokens
):
yield chunk

Expand Down
Loading