@@ -255,11 +255,12 @@ def __init__(self,
255255 # Define Subject that listen incoming data and produces writes into InfluxDB
256256 self ._subject = Subject ()
257257
258+ self ._window_scheduler = ThreadPoolScheduler (1 )
258259 self ._disposable = self ._subject .pipe (
259260 # Split incoming data to windows by batch_size or flush_interval
260261 ops .window_with_time_or_count (count = write_options .batch_size ,
261262 timespan = timedelta (milliseconds = write_options .flush_interval ),
262- scheduler = ThreadPoolScheduler ( 1 ) ),
263+ scheduler = self . _window_scheduler ),
263264 # Map window into groups defined by 'organization', 'bucket' and 'precision'
264265 ops .flat_map (lambda window : window .pipe (
265266 # Group window by 'organization', 'bucket' and 'precision'
@@ -440,6 +441,11 @@ def __del__(self):
440441 )
441442 break
442443
444+ if self ._window_scheduler :
445+ self ._window_scheduler .executor .shutdown ()
446+ self ._window_scheduler .executor = None
447+ self ._window_scheduler = None
448+
443449 if self ._disposable :
444450 self ._disposable = None
445451 pass
@@ -565,6 +571,7 @@ def __getstate__(self):
565571 # Remove rx
566572 del state ['_subject' ]
567573 del state ['_disposable' ]
574+ del state ['_window_scheduler' ]
568575 del state ['_write_service' ]
569576 return state
570577
0 commit comments