Skip to content

Commit 66f0d4c

Browse files
committed
Fix bookkeeping data order
1 parent 7f20a18 commit 66f0d4c

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

llmstack/apps/runner/app_coordinator.py

+3
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ def tell_actor(self, actor_id: str, message: Message):
108108
**self._actor_configs_map[actor_id_prefix].kwargs,
109109
)
110110

111+
if message.type == MessageType.BEGIN or message.type == MessageType.CONTENT:
112+
self._bookkeeping_queue.put_nowait((actor_id, None))
113+
111114
self.actors[actor_id].tell(message)
112115

113116
def relay(self, message: Message):

llmstack/apps/runner/app_runner.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import time
34
import uuid
45
from enum import Enum
56
from typing import Any, Callable, Dict, List, Optional, Union
@@ -384,9 +385,18 @@ def _get_bookkeeping_data(self):
384385
Return a dictionary of bookkeeping data from the queue
385386
"""
386387
bookkeeping_data = {}
387-
while not self._bookkeeping_queue.empty():
388-
stream_id, data = self._bookkeeping_queue.get_nowait()
389-
bookkeeping_data[stream_id] = data
388+
start_time = time.time()
389+
while (not self._bookkeeping_queue.empty() or any([data is None for data in bookkeeping_data.values()])) and (
390+
time.time() - start_time < 5
391+
):
392+
if self._bookkeeping_queue.empty():
393+
continue
394+
395+
actor_id, data = self._bookkeeping_queue.get_nowait()
396+
397+
if actor_id not in bookkeeping_data or data is not None:
398+
bookkeeping_data[actor_id] = data
399+
390400
self._bookkeeping_queue.task_done()
391401

392402
return bookkeeping_data

0 commit comments

Comments
 (0)