@@ -344,3 +344,43 @@ async def test_messages_on_retry(
344344 StreamStarted (),
345345 StreamFatalError (error ),
346346 ]
347+
348+
349+ @mock .patch (
350+ "frequenz.client.base.streaming.asyncio.sleep" , autospec = True , wraps = asyncio .sleep
351+ )
352+ async def test_retry_reset (
353+ mock_sleep : mock .MagicMock ,
354+ receiver_ready_event : asyncio .Event , # pylint: disable=redefined-outer-name
355+ ) -> None :
356+ """Test that retry strategy resets after a successful start."""
357+ helper = streaming .GrpcStreamBroadcaster (
358+ stream_name = "test_helper" ,
359+ stream_method = lambda : _ErroringAsyncIter (
360+ mock_error (),
361+ receiver_ready_event ,
362+ ),
363+ transform = _transformer ,
364+ retry_strategy = retry .LinearBackoff (
365+ limit = 1 ,
366+ interval = 0.01 ,
367+ jitter = 0.0 ,
368+ ),
369+ retry_on_exhausted_stream = True ,
370+ )
371+
372+ items : list [str ] = []
373+ events : list [StreamEvent ] = []
374+ async with AsyncExitStack () as stack :
375+ stack .push_async_callback (helper .stop )
376+
377+ receiver = helper .new_receiver ()
378+ receiver_ready_event .set ()
379+ _ = await _split_message (receiver )
380+
381+ mock_sleep .assert_has_calls (
382+ [
383+ mock .call (0.01 ), # First retry after the first error
384+ mock .call (0.01 ), # Second retry after the second error
385+ ]
386+ )
0 commit comments