diff --git a/s3transfer/utils.py b/s3transfer/utils.py index 61407eba..4846e13b 100644 --- a/s3transfer/utils.py +++ b/s3transfer/utils.py @@ -605,44 +605,53 @@ class NoResourcesAvailable(Exception): pass -class TaskSemaphore: - def __init__(self, count): - """A semaphore for the purpose of limiting the number of tasks - - :param count: The size of semaphore - """ - self._semaphore = threading.Semaphore(count) +class BaseSemaphore: + """Base class for semaphores""" def acquire(self, tag, blocking=True): """Acquire the semaphore - :param tag: A tag identifying what is acquiring the semaphore. Note - that this is not really needed to directly use this class but is - needed for API compatibility with the SlidingWindowSemaphore - implementation. - :param block: If True, block until it can be acquired. If False, + :param tag: A tag identifying what is acquiring the semaphore. Needed + for API compatibility with the SlidingWindowSemaphore implementation. + :param blocking: If True, block until it can be acquired. If False, do not block and raise an exception if cannot be acquired. :returns: A token (can be None) to use when releasing the semaphore """ - logger.debug("Acquiring %s", tag) - if not self._semaphore.acquire(blocking): - raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) + raise NotImplementedError("must implement acquire()") def release(self, tag, acquire_token): """Release the semaphore :param tag: A tag identifying what is releasing the semaphore - :param acquire_token: The token returned from when the semaphore was - acquired. Note that this is not really needed to directly use this - class but is needed for API compatibility with the + :param acquire_token: The token returned from when the semaphore was + acquired. Needed for API compatibility with the SlidingWindowSemaphore implementation. """ + raise NotImplementedError("must implement release()") + + +class TaskSemaphore(BaseSemaphore): + """A wrapper around a simple semaphore""" + + def __init__(self, count): + """A semaphore for the purpose of limiting the number of tasks + + :param count: The size of semaphore + """ + self._semaphore = threading.Semaphore(count) + + def acquire(self, tag, blocking=True): + logger.debug("Acquiring %s", tag) + if not self._semaphore.acquire(blocking): + raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) + + def release(self, tag, acquire_token): logger.debug(f"Releasing acquire {tag}/{acquire_token}") self._semaphore.release() -class SlidingWindowSemaphore(TaskSemaphore): +class SlidingWindowSemaphore(BaseSemaphore): """A semaphore used to coordinate sequential resource access. This class is similar to the stdlib BoundedSemaphore: @@ -660,7 +669,7 @@ class SlidingWindowSemaphore(TaskSemaphore): this semaphore can also enforce that you only have a max range of 10 at any given point in time. You must also specify a tag name when you acquire the semaphore. The sliding window semantics apply - on a per tag basis. The internal count will only be incremented + on a per-tag basis. The internal count will only be incremented when the minimum sequence number for a tag is released. """ @@ -676,6 +685,7 @@ def __init__(self, count): self._pending_release = {} def current_count(self): + """Current semaphore count""" with self._lock: return self._count