|
14 | 14 | from . import datatypes, topic_reader_asyncio
|
15 | 15 | from .datatypes import PublicBatch, PublicMessage
|
16 | 16 | from .topic_reader import PublicReaderSettings
|
17 |
| -from .topic_reader_asyncio import ReaderStream, ReaderReconnector |
| 17 | +from .topic_reader_asyncio import ReaderStream, ReaderReconnector, TopicReaderError |
18 | 18 | from .._grpc.grpcwrapper.common_utils import SupportedDriverType, ServerStatus
|
19 | 19 | from .._grpc.grpcwrapper.ydb_topic import (
|
20 | 20 | StreamReadMessage,
|
|
36 | 36 | else:
|
37 | 37 | from .._grpc.common.protos import ydb_status_codes_pb2
|
38 | 38 |
|
| 39 | +from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT |
| 40 | + |
39 | 41 |
|
40 | 42 | @pytest.fixture(autouse=True)
|
41 | 43 | def handle_exceptions(event_loop):
|
@@ -1475,6 +1477,46 @@ def logged():
|
1475 | 1477 |
|
1476 | 1478 | await wait_condition(logged)
|
1477 | 1479 |
|
| 1480 | + async def test_init_timeout_parameter(self, stream, default_reader_settings): |
| 1481 | + """Test that ReaderStream._start calls stream.receive with timeout=10""" |
| 1482 | + reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings) |
| 1483 | + init_message = default_reader_settings._init_message() |
| 1484 | + |
| 1485 | + # Mock stream.receive to check if timeout is passed |
| 1486 | + with mock.patch.object(stream, "receive") as mock_receive: |
| 1487 | + mock_receive.return_value = StreamReadMessage.FromServer( |
| 1488 | + server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []), |
| 1489 | + server_message=StreamReadMessage.InitResponse(session_id="test_session"), |
| 1490 | + ) |
| 1491 | + |
| 1492 | + await reader._start(stream, init_message) |
| 1493 | + |
| 1494 | + # Verify that receive was called with timeout |
| 1495 | + mock_receive.assert_called_with(timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT) |
| 1496 | + |
| 1497 | + await reader.close(False) |
| 1498 | + |
| 1499 | + async def test_init_timeout_behavior(self, stream, default_reader_settings): |
| 1500 | + """Test that ReaderStream._start raises TopicReaderError when receive times out""" |
| 1501 | + reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings) |
| 1502 | + init_message = default_reader_settings._init_message() |
| 1503 | + |
| 1504 | + # Mock stream.receive to directly raise TimeoutError when called with timeout |
| 1505 | + async def timeout_receive(timeout=None): |
| 1506 | + if timeout == DEFAULT_INITIAL_RESPONSE_TIMEOUT: |
| 1507 | + raise asyncio.TimeoutError("Simulated timeout") |
| 1508 | + return StreamReadMessage.FromServer( |
| 1509 | + server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []), |
| 1510 | + server_message=StreamReadMessage.InitResponse(session_id="test_session"), |
| 1511 | + ) |
| 1512 | + |
| 1513 | + with mock.patch.object(stream, "receive", side_effect=timeout_receive): |
| 1514 | + # Should raise TopicReaderError with timeout message |
| 1515 | + with pytest.raises(TopicReaderError, match="Timeout waiting for init response"): |
| 1516 | + await reader._start(stream, init_message) |
| 1517 | + |
| 1518 | + await reader.close(False) |
| 1519 | + |
1478 | 1520 |
|
1479 | 1521 | @pytest.mark.asyncio
|
1480 | 1522 | class TestReaderReconnector:
|
|
0 commit comments