Skip to content

Commit

Permalink
Refactored switching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Feb 10, 2025
1 parent 8d3bfce commit 509ea05
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class ConcurrentPerPartitionCursor(Cursor):
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 1000
DEFAULT_MAX_PARTITIONS_NUMBER = 10_000
SWITCH_TO_GLOBAL_LIMIT = 1000
_NO_STATE: Mapping[str, Any] = {}
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_GLOBAL_STATE_KEY = "state"
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
self._new_global_cursor: Optional[StreamState] = None
self._lookback_window: int = 0
self._parent_state: Optional[StreamState] = None
self._over_limit: int = 0
self._number_of_partitions: int = 0
self._use_global_cursor: bool = False
self._partition_serializer = PerPartitionKeySerializer()

Expand Down Expand Up @@ -233,8 +234,8 @@ def _ensure_partition_limit(self) -> None:
or removed due to being the oldest.
"""
with self._lock:
self._number_of_partitions += 1
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if (
Expand All @@ -245,7 +246,7 @@ def _ensure_partition_limit(self) -> None:
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
)
break
else:
Expand All @@ -254,7 +255,7 @@ def _ensure_partition_limit(self) -> None:
1
] # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
)

def _set_initial_state(self, stream_state: StreamState) -> None:
Expand Down Expand Up @@ -355,6 +356,10 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
logger.info(
f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
f"Switching to global cursor for {self._stream_name}."
)
self._use_global_cursor = True

if not record.associated_slice:
Expand Down Expand Up @@ -397,4 +402,4 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
return cursor

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

0 comments on commit 509ea05

Please sign in to comment.