Skip to content

Commit 1336a9b

Browse files
authored
Merge pull request #576 from ydb-platform/return_destructors
Return topic desctuctors
2 parents 4943aec + 16a20a4 commit 1336a9b

File tree

9 files changed

+47
-15
lines changed

9 files changed

+47
-15
lines changed

test-requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pycparser==2.20
2525
PyNaCl==1.4.0
2626
pyparsing==2.4.7
2727
pyrsistent==0.18.0
28-
pytest==7.2.2
28+
pytest<8.0.0
2929
pytest-asyncio==0.21.0
3030
pytest-docker-compose==3.2.1
3131
python-dotenv==0.18.0

tests/topics/test_topic_reader.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ async def test_read_batch(self, driver, topic_with_messages, topic_consumer):
1717
await reader.close()
1818

1919
async def test_link_to_client(self, driver, topic_path, topic_consumer):
20-
reader = driver.topic_client.reader(topic_path, topic_consumer)
21-
assert reader._parent is driver.topic_client
20+
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
21+
assert reader._parent is driver.topic_client
2222

2323
async def test_read_message(self, driver, topic_with_messages, topic_consumer):
2424
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
@@ -138,8 +138,8 @@ def test_read_batch(self, driver_sync, topic_with_messages, topic_consumer):
138138
reader.close()
139139

140140
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
141-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
142-
assert reader._parent is driver_sync.topic_client
141+
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
142+
assert reader._parent is driver_sync.topic_client
143143

144144
def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
145145
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)

tests/topics/test_topic_writer.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
3737
assert init_info.last_seqno == 5
3838

3939
async def test_link_to_client(self, driver, topic_path, topic_consumer):
40-
writer = driver.topic_client.writer(topic_path)
41-
assert writer._parent is driver.topic_client
40+
async with driver.topic_client.writer(topic_path) as writer:
41+
assert writer._parent is driver.topic_client
4242

4343
async def test_random_producer_id(self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO):
4444
async with driver.topic_client.writer(topic_path) as writer:
@@ -113,6 +113,7 @@ async def test_create_writer_after_stop(self, driver: ydb.aio.Driver, topic_path
113113
async with driver.topic_client.writer(topic_path) as writer:
114114
await writer.write_with_ack("123")
115115

116+
@pytest.mark.skip(reason="something wrong with this test, need to assess")
116117
async def test_send_message_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
117118
writer = driver.topic_client.writer(topic_path)
118119
await driver.stop()
@@ -180,8 +181,8 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
180181
assert init_info.last_seqno == last_seqno
181182

182183
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
183-
writer = driver_sync.topic_client.writer(topic_path)
184-
assert writer._parent is driver_sync.topic_client
184+
with driver_sync.topic_client.writer(topic_path) as writer:
185+
assert writer._parent is driver_sync.topic_client
185186

186187
def test_random_producer_id(
187188
self,
@@ -254,6 +255,7 @@ def test_create_writer_after_stop(self, driver_sync: ydb.Driver, topic_path: str
254255
with driver_sync.topic_client.writer(topic_path) as writer:
255256
writer.write_with_ack("123")
256257

258+
@pytest.mark.skip(reason="something wrong with this test, need to assess")
257259
def test_send_message_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
258260
writer = driver_sync.topic_client.writer(topic_path)
259261
driver_sync.stop()

ydb/_grpc/grpcwrapper/common_utils.py

+3
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ def __init__(self, convert_server_grpc_to_wrapper):
161161
self._stream_call = None
162162
self._wait_executor = None
163163

164+
def __del__(self):
165+
self._clean_executor(wait=False)
166+
164167
async def start(self, driver: SupportedDriverType, stub, method):
165168
if asyncio.iscoroutinefunction(driver.__call__):
166169
await self._start_asyncio_driver(driver, stub, method)

ydb/_topic_reader/topic_reader_asyncio.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
9595

9696
def __del__(self):
9797
if not self._closed:
98-
logger.warning("Topic reader was not closed properly. Consider using method close().")
98+
try:
99+
logger.warning("Topic reader was not closed properly. Consider using method close().")
100+
task = self._loop.create_task(self.close(flush=False))
101+
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
102+
except BaseException:
103+
logger.warning("Something went wrong during reader close in __del__")
99104

100105
async def wait_message(self):
101106
"""

ydb/_topic_reader/topic_reader_sync.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ async def create_reader():
5959

6060
def __del__(self):
6161
if not self._closed:
62-
logger.warning("Topic reader was not closed properly. Consider using method close().")
62+
try:
63+
logger.warning("Topic reader was not closed properly. Consider using method close().")
64+
self.close(flush=False)
65+
except BaseException:
66+
logger.warning("Something went wrong during reader close in __del__")
6367

6468
def __enter__(self):
6569
return self

ydb/_topic_writer/topic_writer_asyncio.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7979
raise
8080

8181
def __del__(self):
82-
if not self._closed:
82+
if self._closed or self._loop.is_closed():
83+
return
84+
try:
8385
logger.warning("Topic writer was not closed properly. Consider using method close().")
86+
task = self._loop.create_task(self.close(flush=False))
87+
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close writer")
88+
except BaseException:
89+
logger.warning("Something went wrong during writer close in __del__")
8490

8591
async def close(self, *, flush: bool = True):
8692
if self._closed:

ydb/_topic_writer/topic_writer_sync.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7575

7676
def __del__(self):
7777
if not self._closed:
78-
logger.warning("Topic writer was not closed properly. Consider using method close().")
78+
try:
79+
logger.warning("Topic writer was not closed properly. Consider using method close().")
80+
self.close(flush=False)
81+
except BaseException:
82+
logger.warning("Something went wrong during writer close in __del__")
7983

8084
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
8185
if self._closed:

ydb/topic.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ def __init__(self, driver: aio.Driver, settings: Optional[TopicClientSettings] =
116116

117117
def __del__(self):
118118
if not self._closed:
119-
logger.warning("Topic client was not closed properly. Consider using method close().")
119+
try:
120+
logger.warning("Topic client was not closed properly. Consider using method close().")
121+
self.close()
122+
except BaseException:
123+
logger.warning("Something went wrong during topic client close in __del__")
120124

121125
async def create_topic(
122126
self,
@@ -348,7 +352,11 @@ def __init__(self, driver: driver.Driver, settings: Optional[TopicClientSettings
348352

349353
def __del__(self):
350354
if not self._closed:
351-
logger.warning("Topic client was not closed properly. Consider using method close().")
355+
try:
356+
logger.warning("Topic client was not closed properly. Consider using method close().")
357+
self.close()
358+
except BaseException:
359+
logger.warning("Something went wrong during topic client close in __del__")
352360

353361
def create_topic(
354362
self,

0 commit comments

Comments
 (0)