Skip to content

Commit 2661e4d

Browse files
authored
fix: dispose window scheduler (influxdata#641)
1 parent ed89701 commit 2661e4d

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
### Bug Fixes
44
1. [#636](https://github.com/influxdata/influxdb-client-python/pull/636): Handle missing data in data frames
5-
2. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0.
5+
1. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0.
6+
1. [#641](https://github.com/influxdata/influxdb-client-python/pull/641): Correctly dispose ThreadPoolScheduler in WriteApi
67

78
### Documentation
89
1. [#639](https://github.com/influxdata/influxdb-client-python/pull/639): Use Markdown for `README`

influxdb_client/client/write_api.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -250,16 +250,18 @@ def __init__(self,
250250
self._success_callback = kwargs.get('success_callback', None)
251251
self._error_callback = kwargs.get('error_callback', None)
252252
self._retry_callback = kwargs.get('retry_callback', None)
253+
self._window_scheduler = None
253254

254255
if self._write_options.write_type is WriteType.batching:
255256
# Define Subject that listen incoming data and produces writes into InfluxDB
256257
self._subject = Subject()
257258

259+
self._window_scheduler = ThreadPoolScheduler(1)
258260
self._disposable = self._subject.pipe(
259261
# Split incoming data to windows by batch_size or flush_interval
260262
ops.window_with_time_or_count(count=write_options.batch_size,
261263
timespan=timedelta(milliseconds=write_options.flush_interval),
262-
scheduler=ThreadPoolScheduler(1)),
264+
scheduler=self._window_scheduler),
263265
# Map window into groups defined by 'organization', 'bucket' and 'precision'
264266
ops.flat_map(lambda window: window.pipe(
265267
# Group window by 'organization', 'bucket' and 'precision'
@@ -440,6 +442,10 @@ def __del__(self):
440442
)
441443
break
442444

445+
if self._window_scheduler:
446+
self._window_scheduler.executor.shutdown(wait=False)
447+
self._window_scheduler = None
448+
443449
if self._disposable:
444450
self._disposable = None
445451
pass
@@ -565,6 +571,7 @@ def __getstate__(self):
565571
# Remove rx
566572
del state['_subject']
567573
del state['_disposable']
574+
del state['_window_scheduler']
568575
del state['_write_service']
569576
return state
570577

0 commit comments

Comments
 (0)