Skip to content

Commit 053ad19

Browse files
authored
Merge pull request #720 from acmcelwee/amc-threaded-walker
ThreadedWalker constructor accepts a Semaphore, not an int
2 parents cfe719e + a3feb4a commit 053ad19

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

stacker/actions/base.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import threading
99

10-
from ..dag import walk, ThreadedWalker
10+
from ..dag import walk, ThreadedWalker, UnlimitedSemaphore
1111
from ..plan import Step, build_plan, build_graph
1212

1313
import botocore.exceptions
@@ -53,7 +53,12 @@ def build_walker(concurrency):
5353
"""
5454
if concurrency == 1:
5555
return walk
56-
return ThreadedWalker(concurrency).walk
56+
57+
semaphore = UnlimitedSemaphore()
58+
if concurrency > 1:
59+
semaphore = threading.Semaphore(concurrency)
60+
61+
return ThreadedWalker(semaphore).walk
5762

5863

5964
def plan(description, stack_action, context,

stacker/dag/__init__.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -414,14 +414,12 @@ class ThreadedWalker(object):
414414
allows, using threads.
415415
416416
Args:
417-
semaphore (threading.Semaphore, optional): a semaphore object which
417+
semaphore (threading.Semaphore): a semaphore object which
418418
can be used to control how many steps are executed in parallel.
419-
By default, there is not limit to the amount of parallelism,
420-
other than what the graph topology allows.
421419
"""
422420

423-
def __init__(self, semaphore=None):
424-
self.semaphore = semaphore or UnlimitedSemaphore()
421+
def __init__(self, semaphore):
422+
self.semaphore = semaphore
425423

426424
def walk(self, dag, walk_func):
427425
""" Walks each node of the graph, in parallel if it can.

stacker/tests/test_dag.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55

66
from nose import with_setup
77
from nose.tools import nottest, raises
8-
from stacker.dag import DAG, DAGValidationError, ThreadedWalker
8+
from stacker.dag import (
9+
DAG,
10+
DAGValidationError,
11+
ThreadedWalker,
12+
UnlimitedSemaphore
13+
)
914
import threading
1015

1116
dag = None
@@ -220,7 +225,7 @@ def test_transitive_deep_reduction():
220225
@with_setup(blank_setup)
221226
def test_threaded_walker():
222227
dag = DAG()
223-
walker = ThreadedWalker()
228+
walker = ThreadedWalker(UnlimitedSemaphore())
224229

225230
# b and c should be executed at the same time.
226231
dag.from_dict({'a': ['b', 'c'],

0 commit comments

Comments
 (0)