Skip to content

Commit c7c1f9f

Browse files
committed
Change topics __del__ to warn if not closed
1 parent 82ec438 commit c7c1f9f

File tree

9 files changed

+33
-20
lines changed

9 files changed

+33
-20
lines changed

tests/topics/test_topic_reader.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,13 @@ def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_message
174174
assert message != message2
175175

176176
def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_consumer):
177-
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
178-
message = reader.receive_message()
179-
reader.commit_with_ack(message)
177+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
178+
message = reader.receive_message()
179+
reader.commit_with_ack(message)
180+
181+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
182+
batch = reader.receive_batch()
180183

181-
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
182-
batch = reader.receive_batch()
183184
assert message != batch.messages[0]
184185

185186
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
@@ -247,3 +248,6 @@ async def wait(fut):
247248
datas.sort()
248249

249250
assert datas == ["10", "11"]
251+
252+
await reader0.close()
253+
await reader1.close()

ydb/_grpc/grpcwrapper/common_utils.py

-3
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,6 @@ def __init__(self, convert_server_grpc_to_wrapper):
161161
self._stream_call = None
162162
self._wait_executor = None
163163

164-
def __del__(self):
165-
self._clean_executor(wait=False)
166-
167164
async def start(self, driver: SupportedDriverType, stub, method):
168165
if asyncio.iscoroutinefunction(driver.__call__):
169166
await self._start_asyncio_driver(driver, stub, method)

ydb/_topic_reader/topic_reader_asyncio.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
9595

9696
def __del__(self):
9797
if not self._closed:
98-
task = self._loop.create_task(self.close(flush=False))
99-
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
98+
logger.warning("Topic reader was not closed properly. Consider using method close().")
10099

101100
async def wait_message(self):
102101
"""

ydb/_topic_reader/topic_reader_sync.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import concurrent.futures
3+
import logging
34
import typing
45
from typing import List, Union, Optional
56

@@ -23,6 +24,8 @@
2324
if typing.TYPE_CHECKING:
2425
from ..query.transaction import BaseQueryTxContext
2526

27+
logger = logging.getLogger(__name__)
28+
2629

2730
class TopicReaderSync:
2831
_caller: CallFromSyncToAsync
@@ -55,7 +58,8 @@ async def create_reader():
5558
self._parent = _parent
5659

5760
def __del__(self):
58-
self.close(flush=False)
61+
if not self._closed:
62+
logger.warning("Topic reader was not closed properly. Consider using method close().")
5963

6064
def __enter__(self):
6165
return self

ydb/_topic_writer/topic_writer_asyncio.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
8080
raise
8181

8282
def __del__(self):
83-
if self._closed or self._loop.is_closed():
84-
return
85-
86-
self._loop.call_soon(functools.partial(self.close, flush=False))
83+
if not self._closed:
84+
logger.warning("Topic writer was not closed properly. Consider using method close().")
8785

8886
async def close(self, *, flush: bool = True):
8987
if self._closed:

ydb/_topic_writer/topic_writer_sync.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import logging
45
import typing
56
from concurrent.futures import Future
67
from typing import Union, List, Optional
@@ -29,6 +30,8 @@
2930
if typing.TYPE_CHECKING:
3031
from ..query.transaction import BaseQueryTxContext
3132

33+
logger = logging.getLogger(__name__)
34+
3235

3336
class WriterSync:
3437
_caller: CallFromSyncToAsync
@@ -71,7 +74,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7174
raise
7275

7376
def __del__(self):
74-
self.close(flush=False)
77+
if not self._closed:
78+
logger.warning("Topic writer was not closed properly. Consider using method close().")
7579

7680
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
7781
if self._closed:

ydb/aio/driver.py

+1
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ def __init__(
6262

6363
async def stop(self, timeout=10):
6464
await self.table_client._stop_pool_if_needed(timeout=timeout)
65+
self.topic_client.close()
6566
await super().stop(timeout=timeout)

ydb/driver.py

+1
Original file line numberDiff line numberDiff line change
@@ -288,4 +288,5 @@ def __init__(
288288

289289
def stop(self, timeout=10):
290290
self.table_client._stop_pool_if_needed(timeout=timeout)
291+
self.topic_client.close()
291292
super().stop(timeout=timeout)

ydb/topic.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import concurrent.futures
3636
import datetime
3737
from dataclasses import dataclass
38+
import logging
3839
from typing import List, Union, Mapping, Optional, Dict, Callable
3940

4041
from . import aio, Credentials, _apis, issues
@@ -92,6 +93,8 @@
9293
PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings,
9394
)
9495

96+
logger = logging.getLogger(__name__)
97+
9598

9699
class TopicClientAsyncIO:
97100
_closed: bool
@@ -112,7 +115,8 @@ def __init__(self, driver: aio.Driver, settings: Optional[TopicClientSettings] =
112115
)
113116

114117
def __del__(self):
115-
self.close()
118+
if not self._closed:
119+
logger.warning("Topic client was not closed properly. Consider using method close().")
116120

117121
async def create_topic(
118122
self,
@@ -320,7 +324,7 @@ def _check_closed(self):
320324
if not self._closed:
321325
return
322326

323-
raise RuntimeError("Topic client closed")
327+
raise issues.Error("Topic client closed")
324328

325329

326330
class TopicClient:
@@ -343,7 +347,8 @@ def __init__(self, driver: driver.Driver, settings: Optional[TopicClientSettings
343347
)
344348

345349
def __del__(self):
346-
self.close()
350+
if not self._closed:
351+
logger.warning("Topic client was not closed properly. Consider using method close().")
347352

348353
def create_topic(
349354
self,
@@ -561,7 +566,7 @@ def _check_closed(self):
561566
if not self._closed:
562567
return
563568

564-
raise RuntimeError("Topic client closed")
569+
raise issues.Error("Topic client closed")
565570

566571

567572
@dataclass

0 commit comments

Comments
 (0)