Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor #331

Merged
merged 22 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
target:
title: Target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import logging
import threading
import time
from collections import OrderedDict
from copy import deepcopy
from datetime import timedelta
Expand Down Expand Up @@ -58,7 +59,8 @@ class ConcurrentPerPartitionCursor(Cursor):
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 10000
DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
SWITCH_TO_GLOBAL_LIMIT = 10_000
_NO_STATE: Mapping[str, Any] = {}
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_GLOBAL_STATE_KEY = "state"
Expand Down Expand Up @@ -99,9 +101,11 @@ def __init__(
self._new_global_cursor: Optional[StreamState] = None
self._lookback_window: int = 0
self._parent_state: Optional[StreamState] = None
self._over_limit: int = 0
self._number_of_partitions: int = 0
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._use_global_cursor: bool = False
self._partition_serializer = PerPartitionKeySerializer()
# Track the last time a state message was emitted
self._last_emission_time: float = 0.0

self._set_initial_state(stream_state)

Expand Down Expand Up @@ -141,20 +145,16 @@ def close_partition(self, partition: Partition) -> None:
raise ValueError("stream_slice cannot be None")

partition_key = self._to_partition_key(stream_slice.partition)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._cursor_per_partition[partition_key].close_partition(partition=partition)
if not self._use_global_cursor:
self._cursor_per_partition[partition_key].close_partition(partition=partition)
with self._lock:
self._semaphore_per_partition[partition_key].acquire()
cursor = self._cursor_per_partition[partition_key]
if (
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
):
if (
self._new_global_cursor is None
or self._new_global_cursor[self.cursor_field.cursor_field_key]
< cursor.state[self.cursor_field.cursor_field_key]
):
self._new_global_cursor = copy.deepcopy(cursor.state)
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
if not self._use_global_cursor:
self._emit_state_message()

Expand All @@ -169,9 +169,12 @@ def ensure_at_least_one_state_emitted(self) -> None:
self._global_cursor = self._new_global_cursor
self._lookback_window = self._timer.finish()
self._parent_state = self._partition_router.get_stream_state()
self._emit_state_message()
self._emit_state_message(throttle=False)

def _emit_state_message(self) -> None:
def _emit_state_message(self, throttle: bool = True) -> None:
current_time = time.time()
if throttle and current_time - self._last_emission_time <= 60:
return
self._connector_state_manager.update_state_for_stream(
self._stream_name,
self._stream_namespace,
Expand All @@ -181,6 +184,7 @@ def _emit_state_message(self) -> None:
self._stream_name, self._stream_namespace
)
self._message_repository.emit_message(state_message)
self._last_emission_time = current_time

def stream_slices(self) -> Iterable[StreamSlice]:
if self._timer.is_running():
Expand Down Expand Up @@ -233,8 +237,8 @@ def _ensure_partition_limit(self) -> None:
or removed due to being the oldest.
"""
with self._lock:
self._number_of_partitions += 1
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if (
Expand All @@ -245,7 +249,7 @@ def _ensure_partition_limit(self) -> None:
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
)
break
else:
Expand All @@ -254,7 +258,7 @@ def _ensure_partition_limit(self) -> None:
1
] # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
)

def _set_initial_state(self, stream_state: StreamState) -> None:
Expand Down Expand Up @@ -355,15 +359,32 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
logger.info(
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
f"Switching to global cursor for {self._stream_name}."
)
self._use_global_cursor = True

if not record.associated_slice:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
)
self._update_global_cursor(record_cursor)
if not self._use_global_cursor:
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

tolik0 marked this conversation as resolved.
Show resolved Hide resolved
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
def _update_global_cursor(self, value: Any) -> None:
if (
self._new_global_cursor is None
or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
):
self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}

def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
return self._partition_serializer.to_partition_key(partition)
Expand Down Expand Up @@ -397,4 +418,4 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
return cursor

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
Loading