Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Refactor ConcurrentPerPartitionCursor #331

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Feb 12, 2025

Summary by CodeRabbit

  • New Features
    • Updated configuration validation now ensures a critical property is required for proper integration.
    • Enhanced data handling improvements include increased partition limits from 10,000 to 25,000 and refined state transitions for more efficient processing.

@github-actions github-actions bot added the enhancement New feature or request label Feb 12, 2025
Copy link
Contributor

coderabbitai bot commented Feb 12, 2025

📝 Walkthrough

Walkthrough

This pull request updates two areas. The schema file for the DatetimeBasedCursor now requires the clamping.target property, and the incremental cursor implementation modifies partition management. In the cursor file, constants have been adjusted, an internal variable renamed, and the logic for handling partition limits and updating the global cursor refactored.

Changes

Files Summary of Changes
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Updated the DatetimeBasedCursor schema to mark clamping.target as required (changed from optional to required).
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py Increased DEFAULT_MAX_PARTITIONS_NUMBER from 10000 to 25000, added SWITCH_TO_GLOBAL_LIMIT with a value of 10000, renamed _over_limit to _number_of_partitions, extracted global cursor update logic into _update_global_cursor, and adjusted methods (close_partition, ensure_at_least_one_state_emitted, and observe) to reflect these changes.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Cursor as ConcurrentPerPartitionCursor
    Caller->>Cursor: Invoke close_partition()
    Note right of Cursor: Evaluate current partition count
    alt Partition count < SWITCH_TO_GLOBAL_LIMIT
        Cursor-->>Caller: Continue using local cursor
    else Partition count ≥ SWITCH_TO_GLOBAL_LIMIT
        Cursor->>Cursor: Call _update_global_cursor()
        Note right of Cursor: Global cursor updated
        Cursor-->>Caller: Switched to global cursor
    end
Loading

Possibly related PRs

Suggested reviewers

  • brianjlai
  • maxi297

Wdyt about these suggestions?

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb5a921 and 05f4db7.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)

61-63: LGTM! The constant changes improve scalability and readability.

The changes look good:

  1. Increasing DEFAULT_MAX_PARTITIONS_NUMBER to 25,000 allows handling larger datasets
  2. New SWITCH_TO_GLOBAL_LIMIT of 1,000 provides a clear threshold for switching to global cursor
  3. Renaming _over_limit to _number_of_partitions better describes its purpose

Also applies to: 103-103


145-154: LGTM! The refactoring improves code organization.

The changes improve the code by:

  1. Adding a guard clause to prevent unnecessary cursor updates when using global cursor
  2. Extracting the global cursor update logic to a dedicated method

354-373: LGTM! The changes improve observability and code organization.

The changes enhance the code by:

  1. Adding an informative log message when switching to global cursor
  2. Extracting record cursor parsing and update logic for better readability
  3. Simplifying the overall logic flow

375-380: LGTM! The new method follows good design principles.

The new _update_global_cursor method:

  1. Follows single responsibility principle
  2. Has clear and concise update conditions
  3. Uses defensive copying to prevent unintended modifications

413-414: LGTM! The changes make the limit check more explicit.

The changes improve clarity by:

  1. Using the renamed _number_of_partitions variable
  2. Comparing against the new SWITCH_TO_GLOBAL_LIMIT constant

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

61-63: Consider documenting the rationale for the new constants and their relationship.

The changes introduce new constants and rename variables, but their relationship and usage could be clearer. Would you consider:

  1. Updating the class docstring to explain why we switch to global cursor at 1000 partitions?
  2. Adding a comment explaining why we increased the max partitions to 25,000?

Also applies to: 103-103


233-255: Consider improving the partition limit warning messages.

The warning messages now include the number of partitions but could be more actionable. What do you think about:

-            logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}.")
+            logger.warning(
+                f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. "
+                f"Dropping oldest finished partition: {oldest_partition}. "
+                f"Current partitions: {self._number_of_partitions}. "
+                "Consider adjusting DEFAULT_MAX_PARTITIONS_NUMBER if this is expected."
+            )

375-380: Consider adding type hints to the _update_global_cursor method.

The new method looks good, but would you consider adding type hints to improve code maintainability? Something like:

-    def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
+    def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
+        """Update the global cursor if the new value is greater than the current value.
+        
+        Args:
+            value: The new cursor value to compare against the current global cursor
+        """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 05f4db7 and c0bc645.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'destination-motherduck' (skip=true)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Ruff Lint Check
  • GitHub Check: Build and Inspect Python Package
  • GitHub Check: Ruff Format Check
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: MyPy Check
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: preview_docs
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

354-374: LGTM! Nice improvement to the observe method.

The refactored observe method is more maintainable with clearer separation of concerns. The new logging message when switching to global cursor is particularly helpful.


145-154: Verify the global cursor update logic.

The new condition for updating the global cursor might miss updates if a partition is closed before all its records are processed. Should we add a check to ensure all records have been processed before updating the global cursor? wdyt?

✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Search for any race conditions or similar patterns in cursor updates
rg -A 5 "global_cursor|new_global_cursor" .

Length of output: 49421


Global cursor update logic appears to be functioning as expected.

The current implementation calls _update_global_cursor() during record observation and again when closing a partition—but only after ensuring via the semaphore (i.e. when _semaphore_per_partition[partition_key]._value equals 0) that all concurrent processing for that partition has completed. This check, along with the fact that each observed record already attempts to update the global cursor, means that the design should capture the latest state even if a partition is closed immediately after processing its records.

For example, note that:

  • The semaphore’s value being 0 within the lock guarantees that no other record-processing tasks are active for the partition.
  • The global cursor is updated both in the record observation path and after finalizing the partition’s processing, which provides a safeguard against missing updates.

Would you agree that this dual-update approach meets our safety needs, or do you think an additional explicit check is warranted?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global cursor logic change looks good to me even though I have a question about the throttling. One thing that is surprising to me is that tests don't change as I would assume that the previous solution would set the global cursor on the slice boundaries while this change would set it on the record cursor value. Am I missing something?

Once we are clear on the tests, I'll approve assuming that limits will be re-worked later so that if you want to release the throttling changes in another PR, you can release these here.

@@ -245,7 +242,7 @@ def _ensure_partition_limit(self) -> None:
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this log be updated? It seems like we print the number of partitions and not the number over the limit. The same comment applies to the log below

@@ -355,15 +352,32 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very nice addition! ❤️

self._to_partition_key(record.associated_slice.partition)
].observe(record)

def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the type of value here be CursorValueType? Else, it seems like we set self._new_global_cursor to {<cursor field>: {<cursor field>: <cursor value>}}

@@ -58,7 +58,8 @@ class ConcurrentPerPartitionCursor(Cursor):
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 10000
DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
SWITCH_TO_GLOBAL_LIMIT = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be changes again once we implement the throttling?

@tolik0
Copy link
Contributor Author

tolik0 commented Feb 13, 2025

@maxi297 The previous implementation set the state as most recent value if present and slices start when it is not present. So the test shouldn't change.

def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any:
"""
Get the latest time before which all records have been processed.
"""
if not slices:
raise RuntimeError(
"Expected at least one slice but there were none. This is unexpected; please contact Support."
)
merged_intervals = self.merge_intervals(slices)
first_interval = merged_intervals[0]
return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY]

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

62-64: Consider documenting the new constants and their relationship.

The relationship between DEFAULT_MAX_PARTITIONS_NUMBER and SWITCH_TO_GLOBAL_LIMIT could be clearer. Would you consider adding docstrings to explain why we switch to global cursor at 10k but allow up to 25k partitions? Also, should we make these configurable via environment variables, wdyt?

Also applies to: 104-108


240-262: Consider improving the logging messages.

The logging messages could be more informative. Would you consider including the partition key in the warning messages to help with debugging? Also, should we add debug-level logs for tracking partition counts?

-            logger.warning(f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}.")
+            logger.warning(f"Maximum partitions ({self.DEFAULT_MAX_PARTITIONS_NUMBER}) reached. Dropping oldest finished partition with key '{partition_key}': {oldest_partition}. Current count: {self._number_of_partitions}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0bc645 and 52b95e3.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

174-187: LGTM! Nice throttling implementation.

The state emission throttling is a good performance optimization to prevent overwhelming the system with state messages.


382-387: LGTM! Clean extraction of global cursor update logic.

The extraction of _update_global_cursor method improves code organization and reusability.

Comment on lines 361 to +381
if not self._use_global_cursor and self.limit_reached():
logger.info(
f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
f"Switching to global cursor for {self._stream_name}."
)
self._use_global_cursor = True

if not record.associated_slice:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
)
self._update_global_cursor(record_cursor)
if not self._use_global_cursor:
self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
].observe(record)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Consider adding error handling for cursor value extraction.

The observe method now extracts cursor values but doesn't handle potential extraction failures. Would you consider adding try-except to handle cases where the cursor field might be missing or have an invalid format?


🏁 Script executed:

#!/bin/bash
# Search for similar error handling patterns in the codebase
rg -A 5 "try:.*extract.*except.*:" 

Length of output: 36


Subject: Consider Adding Error Handling for Cursor Extraction

It looks like the current implementation directly extracts and parses the cursor value without any safeguards. While verifying with an rg search we didn’t find similar try/except patterns around the extraction logic, there might still be risks if the cursor field is missing or malformed. Would you consider wrapping the extraction and parsing call in a try/except block to catch potential errors and log a meaningful message? For example, around this section:

  • File: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
  • Lines: 361-381 (specifically where record_cursor is assigned)

This change could help prevent unexpected runtime exceptions. wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants