14
14
# limitations under the License.
15
15
16
16
from abc import ABC , abstractmethod
17
- from typing import TYPE_CHECKING , AsyncGenerator , List , NamedTuple
18
-
19
- if TYPE_CHECKING :
20
- from collections .abc import AsyncIterator
17
+ from typing import AsyncGenerator , List , NamedTuple
21
18
22
19
from nemoguardrails .rails .llm .config import OutputRailsStreamingConfig
23
20
@@ -114,7 +111,9 @@ def format_chunks(self, chunks: List[str]) -> str:
114
111
...
115
112
116
113
@abstractmethod
117
- async def process_stream (self , streaming_handler ):
114
+ async def process_stream (
115
+ self , streaming_handler
116
+ ) -> AsyncGenerator [ChunkBatch , None ]:
118
117
"""Process streaming chunks and yield chunk batches.
119
118
120
119
This is the main method that concrete buffer strategies must implement.
@@ -139,9 +138,10 @@ async def process_stream(self, streaming_handler):
139
138
... print(f"Processing: {context_formatted}")
140
139
... print(f"User: {user_formatted}")
141
140
"""
142
- yield ChunkBatch ([], []) # pragma: no cover
141
+ raise NotImplementedError
142
+ yield
143
143
144
- async def __call__ (self , streaming_handler ):
144
+ async def __call__ (self , streaming_handler ) -> AsyncGenerator [ ChunkBatch , None ] :
145
145
"""Callable interface that delegates to process_stream.
146
146
147
147
It delegates to the `process_stream` method and can
@@ -257,7 +257,9 @@ def from_config(cls, config: OutputRailsStreamingConfig):
257
257
buffer_context_size = config .context_size , buffer_chunk_size = config .chunk_size
258
258
)
259
259
260
- async def process_stream (self , streaming_handler ):
260
+ async def process_stream (
261
+ self , streaming_handler
262
+ ) -> AsyncGenerator [ChunkBatch , None ]:
261
263
"""Process streaming chunks using rolling buffer strategy.
262
264
263
265
This method implements the rolling buffer logic, accumulating chunks
0 commit comments