Skip to content

Commit 2b3aad2

Browse files
committed
fixup! Add missing retry strategy reset in streaming
1 parent 718d386 commit 2b3aad2

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

src/frequenz/client/base/streaming.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
_logger = logging.getLogger(__name__)
2020

2121

22+
RequestT = TypeVar("RequestT")
23+
"""The request type of the stream."""
24+
25+
2226
InputT = TypeVar("InputT")
2327
"""The input type of the stream."""
2428

@@ -105,7 +109,7 @@ def async_range() -> AsyncIterable[int]:
105109
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
106110
self,
107111
stream_name: str,
108-
stream_method: Callable[[], AsyncIterable[InputT]],
112+
stream_method: Callable[[], grpc.aio.UnaryStreamCall[RequestT, InputT]],
109113
transform: Callable[[InputT], OutputT],
110114
retry_strategy: retry.Strategy | None = None,
111115
retry_on_exhausted_stream: bool = False,
@@ -182,6 +186,7 @@ async def _run(self) -> None:
182186
_logger.info("%s: starting to stream", self._stream_name)
183187
try:
184188
call = self._stream_method()
189+
await call.initial_metadata()
185190
self._retry_strategy.reset()
186191
await sender.send(StreamStarted())
187192
async for msg in call:

0 commit comments

Comments
 (0)