Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor #331

Merged
merged 22 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
target:
title: Target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class ConcurrentPerPartitionCursor(Cursor):
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 10000
DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
SWITCH_TO_GLOBAL_LIMIT = 1000
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
_NO_STATE: Mapping[str, Any] = {}
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_GLOBAL_STATE_KEY = "state"
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
self._new_global_cursor: Optional[StreamState] = None
self._lookback_window: int = 0
self._parent_state: Optional[StreamState] = None
self._over_limit: int = 0
self._number_of_partitions: int = 0
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._use_global_cursor: bool = False
self._partition_serializer = PerPartitionKeySerializer()

Expand Down Expand Up @@ -141,20 +142,16 @@ def close_partition(self, partition: Partition) -> None:
raise ValueError("stream_slice cannot be None")

partition_key = self._to_partition_key(stream_slice.partition)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._cursor_per_partition[partition_key].close_partition(partition=partition)
if not self._use_global_cursor:
self._cursor_per_partition[partition_key].close_partition(partition=partition)
with self._lock:
self._semaphore_per_partition[partition_key].acquire()
cursor = self._cursor_per_partition[partition_key]
if (
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
):
if (
self._new_global_cursor is None
or self._new_global_cursor[self.cursor_field.cursor_field_key]
< cursor.state[self.cursor_field.cursor_field_key]
):
self._new_global_cursor = copy.deepcopy(cursor.state)
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
if not self._use_global_cursor:
self._emit_state_message()

Expand Down Expand Up @@ -233,8 +230,8 @@ def _ensure_partition_limit(self) -> None:
or removed due to being the oldest.
"""
with self._lock:
self._number_of_partitions += 1
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if (
Expand All @@ -245,7 +242,7 @@ def _ensure_partition_limit(self) -> None:
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
)
break
else:
Expand All @@ -254,7 +251,7 @@ def _ensure_partition_limit(self) -> None:
1
] # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
)

def _set_initial_state(self, stream_state: StreamState) -> None:
Expand Down Expand Up @@ -355,15 +352,32 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
logger.info(
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
f"Switching to global cursor for {self._stream_name}."
)
self._use_global_cursor = True

if not record.associated_slice:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

record_cursor = self._connector_state_converter.parse_value(
self._cursor_field.extract_value(record)
)
self._update_global_cursor(record_cursor)
if not self._use_global_cursor:
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
if (
self._new_global_cursor is None
or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
):
self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}

def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
return self._partition_serializer.to_partition_key(partition)
Expand Down Expand Up @@ -397,4 +411,4 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
return cursor

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
Loading