Skip to content

Commit e7ccf76

Browse files
authored
fix(concurrent): Ensure default concurrency level does not generate deadlock (#148)
1 parent 6ab54c0 commit e7ccf76

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@
5656

5757

5858
class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
59-
# By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread.
60-
SINGLE_THREADED_CONCURRENCY_LEVEL = 1
59+
# By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
60+
# because it has hit the limit of futures but not partition reader is consuming them.
61+
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2
6162

6263
def __init__(
6364
self,
@@ -107,8 +108,8 @@ def __init__(
107108
concurrency_level // 2, 1
108109
) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
109110
else:
110-
concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL
111-
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL
111+
concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
112+
initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
112113

113114
self._concurrent_source = ConcurrentSource.create(
114115
num_workers=concurrency_level,

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ definitions:
327327
additionalProperties: true
328328
ConcurrencyLevel:
329329
title: Concurrency Level
330-
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time.
330+
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions.
331331
type: object
332332
required:
333333
- default_concurrency

0 commit comments

Comments
 (0)