Skip to content

Commit 1ce9672

Browse files
authored
Merge pull request #490 from basho/fixes/lrb/clean-up-multi-threading-gh-489
Threading fixes
2 parents 25979e6 + 6f7d420 commit 1ce9672

File tree

3 files changed

+75
-58
lines changed

3 files changed

+75
-58
lines changed

RELNOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5)
44

55
* [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488)
6+
* [Calling `close` on client closes pools, remove global multi pools](https://github.com/basho/riak-python-client/pull/490). *NOTE*: if you use the multi get or put features of the client, you *MUST* call `close()` on your `RiakClient` instance to correctly clean up the thread pools used for these multi-operations.
67

78
## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4)
89

riak/client/__init__.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def __init__(self, protocol='pbc', transport_options={},
109109
self._credentials = self._create_credentials(credentials)
110110
self._http_pool = HttpPool(self, **transport_options)
111111
self._tcp_pool = TcpPool(self, **transport_options)
112+
self._closed = False
112113

113114
if PY2:
114115
self._encoders = {'application/json': default_encoder,
@@ -131,10 +132,7 @@ def __init__(self, protocol='pbc', transport_options={},
131132
self._tables = WeakValueDictionary()
132133

133134
def __del__(self):
134-
if self._multiget_pool:
135-
self._multiget_pool.stop()
136-
if self._multiput_pool:
137-
self._multiput_pool.stop()
135+
self.close()
138136

139137
def _get_protocol(self):
140138
return self._protocol
@@ -310,10 +308,19 @@ def close(self):
310308
"""
311309
Iterate through all of the connections and close each one.
312310
"""
313-
if self._http_pool is not None:
314-
self._http_pool.clear()
315-
if self._tcp_pool is not None:
316-
self._tcp_pool.clear()
311+
if not self._closed:
312+
self._closed = True
313+
self._stop_multi_pools()
314+
if self._http_pool is not None:
315+
self._http_pool.clear()
316+
if self._tcp_pool is not None:
317+
self._tcp_pool.clear()
318+
319+
def _stop_multi_pools(self):
320+
if self._multiget_pool:
321+
self._multiget_pool.stop()
322+
if self._multiput_pool:
323+
self._multiput_pool.stop()
317324

318325
def _create_node(self, n):
319326
if isinstance(n, RiakNode):

riak/client/multi.py

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
from riak.riak_object import RiakObject
88
from riak.ts_object import TsObject
99

10-
import atexit
11-
1210
if PY2:
1311
from Queue import Queue, Empty
1412
else:
@@ -89,7 +87,7 @@ def start(self):
8987
name = "riak.client.multi-worker-{0}-{1}".format(
9088
self._name, i)
9189
worker = Thread(target=self._worker_method, name=name)
92-
worker.daemon = True
90+
worker.daemon = False
9391
worker.start()
9492
self._workers.append(worker)
9593
self._started.set()
@@ -149,6 +147,11 @@ def _worker_method(self):
149147
while not self._should_quit():
150148
try:
151149
task = self._inq.get(block=True, timeout=0.25)
150+
except TypeError:
151+
if self._should_quit():
152+
break
153+
else:
154+
raise
152155
except Empty:
153156
continue
154157

@@ -179,6 +182,11 @@ def _worker_method(self):
179182
while not self._should_quit():
180183
try:
181184
task = self._inq.get(block=True, timeout=0.25)
185+
except TypeError:
186+
if self._should_quit():
187+
break
188+
else:
189+
raise
182190
except Empty:
183191
continue
184192

@@ -200,29 +208,16 @@ def _worker_method(self):
200208
self._inq.task_done()
201209

202210

203-
RIAK_MULTIGET_POOL = MultiGetPool()
204-
RIAK_MULTIPUT_POOL = MultiPutPool()
205-
206-
207-
def stop_pools():
208-
"""Stop worker pools at exit."""
209-
RIAK_MULTIGET_POOL.stop()
210-
RIAK_MULTIPUT_POOL.stop()
211-
212-
213-
atexit.register(stop_pools)
214-
215-
216211
def multiget(client, keys, **options):
217212
"""Executes a parallel-fetch across multiple threads. Returns a list
218213
containing :class:`~riak.riak_object.RiakObject` or
219214
:class:`~riak.datatypes.Datatype` instances, or 4-tuples of
220215
bucket-type, bucket, key, and the exception raised.
221216
222217
If a ``pool`` option is included, the request will use the given worker
223-
pool and not the default :data:`RIAK_MULTIGET_POOL`. This option will
224-
be passed by the client if the ``multiget_pool_size`` option was set on
225-
client initialization.
218+
pool and not a transient :class:`~riak.client.multi.MultiGetPool`. This
219+
option will be passed by the client if the ``multiget_pool_size``
220+
option was set on client initialization.
226221
227222
:param client: the client to use
228223
:type client: :class:`~riak.client.RiakClient`
@@ -234,26 +229,33 @@ def multiget(client, keys, **options):
234229
:rtype: list
235230
236231
"""
232+
transient_pool = False
237233
outq = Queue()
238234

239235
if 'pool' in options:
240236
pool = options['pool']
241237
del options['pool']
242238
else:
243-
pool = RIAK_MULTIGET_POOL
244-
245-
pool.start()
246-
for bucket_type, bucket, key in keys:
247-
task = Task(client, outq, bucket_type, bucket, key, None, options)
248-
pool.enq(task)
249-
250-
results = []
251-
for _ in range(len(keys)):
252-
if pool.stopped():
253-
raise RuntimeError("Multi-get operation interrupted by pool "
254-
"stopping!")
255-
results.append(outq.get())
256-
outq.task_done()
239+
pool = MultiGetPool()
240+
transient_pool = True
241+
242+
try:
243+
pool.start()
244+
for bucket_type, bucket, key in keys:
245+
task = Task(client, outq, bucket_type, bucket, key, None, options)
246+
pool.enq(task)
247+
248+
results = []
249+
for _ in range(len(keys)):
250+
if pool.stopped():
251+
raise RuntimeError(
252+
'Multi-get operation interrupted by pool '
253+
'stopping!')
254+
results.append(outq.get())
255+
outq.task_done()
256+
finally:
257+
if transient_pool:
258+
pool.stop()
257259

258260
return results
259261

@@ -263,9 +265,9 @@ def multiput(client, objs, **options):
263265
containing booleans or :class:`~riak.riak_object.RiakObject`
264266
265267
If a ``pool`` option is included, the request will use the given worker
266-
pool and not the default :data:`RIAK_MULTIPUT_POOL`. This option will
267-
be passed by the client if the ``multiput_pool_size`` option was set on
268-
client initialization.
268+
pool and not a transient :class:`~riak.client.multi.MultiPutPool`. This
269+
option will be passed by the client if the ``multiput_pool_size``
270+
option was set on client initialization.
269271
270272
:param client: the client to use
271273
:type client: :class:`RiakClient <riak.client.RiakClient>`
@@ -277,25 +279,32 @@ def multiput(client, objs, **options):
277279
:type options: dict
278280
:rtype: list
279281
"""
282+
transient_pool = False
280283
outq = Queue()
281284

282285
if 'pool' in options:
283286
pool = options['pool']
284287
del options['pool']
285288
else:
286-
pool = RIAK_MULTIPUT_POOL
287-
288-
pool.start()
289-
for obj in objs:
290-
task = PutTask(client, outq, obj, options)
291-
pool.enq(task)
292-
293-
results = []
294-
for _ in range(len(objs)):
295-
if pool.stopped():
296-
raise RuntimeError("Multi-put operation interrupted by pool "
297-
"stopping!")
298-
results.append(outq.get())
299-
outq.task_done()
289+
pool = MultiPutPool()
290+
transient_pool = True
291+
292+
try:
293+
pool.start()
294+
for obj in objs:
295+
task = PutTask(client, outq, obj, options)
296+
pool.enq(task)
297+
298+
results = []
299+
for _ in range(len(objs)):
300+
if pool.stopped():
301+
raise RuntimeError(
302+
'Multi-put operation interrupted by pool '
303+
'stopping!')
304+
results.append(outq.get())
305+
outq.task_done()
306+
finally:
307+
if transient_pool:
308+
pool.stop()
300309

301310
return results

0 commit comments

Comments
 (0)