Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.pyc
*.pyo
*~
41 changes: 25 additions & 16 deletions test/test_workerpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@


class TestWorkerPool(unittest.TestCase):
def __init__(self, *args, **kwargs):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of this addition?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test failures raise an exception in the main thread, and do not shut the pools down. On my machine, this hangs up the entire test suite. By using self.pool() to create pools, we get automatic shutdowns regardless of test pass/failure.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, good catch! Can we change def pool(...) to def get_pool(...) please? Or maybe get_workerpool. Otherwise looks good.

self._pools = []
super(TestWorkerPool, self).__init__(*args, **kwargs)

def run(self, *args, **kwargs):
try:
super(TestWorkerPool, self).run(*args, **kwargs)
finally:
for pool in self._pools:
pool.shutdown()

def get_workerpool(self, *args):
p = workerpool.WorkerPool(*args)
self._pools.append(p)
return p

def double(self, i):
return i * 2

Expand All @@ -16,48 +32,42 @@ def add(self, *args):

def test_map(self):
"Map a list to a method to a pool of two workers."
pool = workerpool.WorkerPool(2)
pool = self.get_workerpool(2)

r = pool.map(self.double, [1, 2, 3, 4, 5])
self.assertEquals(set(r), {2, 4, 6, 8, 10})
pool.shutdown()
self.assertEquals(r, [2, 4, 6, 8, 10])

def test_map_multiparam(self):
"Test map with multiple parameters."
pool = workerpool.WorkerPool(2)
pool = self.get_workerpool(2)
r = pool.map(self.add, [1, 2, 3], [4, 5, 6])
self.assertEquals(set(r), {5, 7, 9})
pool.shutdown()
self.assertEquals(r, [5, 7, 9])

def test_wait(self):
"Make sure each task gets marked as done so pool.wait() works."
pool = workerpool.WorkerPool(5)
pool = self.get_workerpool(5)
q = Queue()
for i in xrange(100):
pool.put(workerpool.SimpleJob(q, sum, [range(5)]))
pool.wait()
pool.shutdown()

def test_init_size(self):
pool = workerpool.WorkerPool(1)
pool = self.get_workerpool(1)
self.assertEquals(pool.size(), 1)
pool.shutdown()

def test_shrink(self):
pool = workerpool.WorkerPool(1)
pool = self.get_workerpool(1)
pool.shrink()
self.assertEquals(pool.size(), 0)
pool.shutdown()

def test_grow(self):
pool = workerpool.WorkerPool(1)
pool = self.get_workerpool(1)
pool.grow()
self.assertEquals(pool.size(), 2)
pool.shutdown()

def test_changesize(self):
"Change sizes and make sure pool doesn't work with no workers."
pool = workerpool.WorkerPool(5)
pool = self.get_workerpool(5)
for i in xrange(5):
pool.grow()
self.assertEquals(pool.size(), 10)
Expand All @@ -77,4 +87,3 @@ def test_changesize(self):
else:
assert False, "Something returned a result, even though we are"
"expecting no workers."
pool.shutdown()
23 changes: 21 additions & 2 deletions workerpool/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ class SimpleJob(Job):
list, the method will execute r = method(*args) or r = method(**args),
depending on args' type, and perform result.put(r).
"""
def __init__(self, result, method, args=[]):
def __init__(self, result, method, args=None):
self.result = result
self.method = method
self.args = args
self.args = args or []

def run(self):
if isinstance(self.args, list) or isinstance(self.args, tuple):
Expand All @@ -46,3 +46,22 @@ def run(self):
def _return(self, r):
"Handle return value by appending to the ``self.result`` queue."
self.result.put(r)


class OrderedSimpleJob(SimpleJob):
"""
Special job used in `pool.map` used to retain order of arguments
and results.
"""
def __init__(self, index, result, method, args=None):
self.index = index
self.result = result
self.method = method
self.args = args or []

def _return(self, r):
"""
Returns the output of the job in addition to the index it
should have in the results list.
"""
self.result.put((self.index, r))
16 changes: 8 additions & 8 deletions workerpool/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from QueueWrapper import Queue

from workers import Worker
from jobs import SimpleJob, SuicideJob
from jobs import OrderedSimpleJob, SuicideJob


__all__ = ['WorkerPool', 'default_worker_factory']
Expand Down Expand Up @@ -95,16 +95,16 @@ def map(self, fn, *seq):
"block until done."
results = Queue()
args = zip(*seq)
for seq in args:
j = SimpleJob(results, fn, seq)
for i, seq in enumerate(args):
j = OrderedSimpleJob(i, results, fn, seq)
self.put(j)

# Aggregate results
r = []
for i in xrange(len(args)):
r.append(results.get())

return r
self.join()
sentinel = object()
results.put(sentinel)
r = sorted(iter(results.get, sentinel))
return [x[1] for x in r]

def wait(self):
"DEPRECATED: Use join() instead."
Expand Down