Skip to content

Fix: State handling when passed in read() method #387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed

Fix: State handling when passed in read() method #387

wants to merge 5 commits into from

Conversation

marianob-span1
Copy link

@marianob-span1 marianob-span1 commented Mar 6, 2025

Low-code connectors instantiate the source without a state and pass the state via the read() method. This PR updates ConcurrentDeclarativeSource to support this behavior.

Changes are:

  • Instantiate the ConnectorStateManager with the state passed in read() method.
  • Update the ModelToComponentFactory to use the stream_state parameter when creating a cursor.
  • Add a test to verify the behavior: instantiate the source without a state and pass the state via the read() method.

Summary by CodeRabbit

  • New Features

    • Enhanced state management during concurrent stream processing to ensure more consistent and reliable data synchronization.
  • Documentation

    • Improved inline explanations to clarify how the stream_state variable is utilized in low-code scenarios.
  • Tests

    • Introduced new tests verifying proper state handling during data extraction, reinforcing robust performance.

Low-code connectors instantiate the source without a state and pass the state
via the read() method. This commit updates the ConcurrentDeclarativeSource to
support this behavior.

Changes are:
- Instantiate the ConnectorStateManager with the state passed in read() method.
- Update the ModelToComponentFactory to use the stream_state parameter when
  creating a cursor.
- Add a test to verify the behavior: instantiate the source without a state
  and pass the state via the read() method.
Copy link
Contributor

coderabbitai bot commented Mar 6, 2025

📝 Walkthrough

Walkthrough

This pull request enhances state management in the ConcurrentDeclarativeSource. It initializes the ConnectorStateManager with the provided state in the read method and passes the current stream_state to cursor constructors in the _group_streams method. Additionally, inline comments clarifying the usage of stream_state in concurrent cursor factories have been added, and a new unit test has been introduced to verify the correct state initialization and functionality of cursors during concurrent stream processing.

Changes

File(s) Change Summary
airbyte_cdk/.../concurrent_declarative_source.py Initializes ConnectorStateManager in read and passes stream_state to cursors in _group_streams for effective state management.
airbyte_cdk/.../model_to_component_factory.py Adds inline comments clarifying the usage of stream_state in methods creating concurrent cursors for low-code connectors.
unit_tests/.../test_concurrent_declarative_source.py Introduces a new test test_concurrent_cursor_with_state_in_read_method to verify that cursors properly handle state when initialized via read.

Possibly related PRs

Suggested labels

bug, enhancement

Suggested reviewers

  • brianjlai
  • maxi297
  • darynaishchenko

Would you like to discuss any of these details further, wdyt?


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5055173 and cdde727.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (1)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

828-900: Good test case for state handling in read method - nice addition!

This test effectively validates that the state can be provided via the read() method and properly initializes the cursor. It covers important assertions related to cursor type, field key, state content, and record filtering based on state.

I notice you're checking both the cursor state initialization and the functionality of record filtering, which provides good coverage. Also appreciated the clear test function name and docstring explaining the test's purpose.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@marianob-span1 marianob-span1 changed the title Fix state handling when passed in read() method Fix: State handling when passed in read() method Mar 6, 2025
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @marianob-span1! Thanks for the contribution. I would like to ask more information regarding this change to make sure I understand what would be the right solution. I'm asking because we have been slowly fading out the use of state through parameters to encapsulate the state into the Cursor interface.

The reasons are:

  • There are a bunch of maintenance problems that comes with multiple classes having access to the state:
    • When we are adding capabilities to state management, the change can affect all those classes and each usage needs to be validated which is long and tedious
    • It is very hard to debug what updates the state if the state is passed everywhere and we've had surprising behavior associated to that
  • We are moving to use concurrency more and more to speed up our sources and having a dict passed around and being able to be modified by many component is not thread safe

Hence, my questions would be:

  • What is the problem you are facing that brings you to create this PR?
  • Why passing the state at the construction of the source is now a valid solution in your case?

In the meanwhile, I still added a couple of comments on the PR to try to bridge the gap between my understanding and yours.

source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
)

concurrent_streams, _ = source._group_streams(config=_CONFIG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is private and only called once per source hence this does not reflect the normal usage of a source. Could you please comment on what this is trying to achieve?

The consequences of using this private method on the test seem important as the test read performed above is applied to another instance of party_members_stream and the read should not influence the state that is tested below. The reason the test passes right now is the read swap the instance of _connector_state_manager and the next time _group_stream is called, it uses the new state.

In order to properly validate that the input state is taken into account, we would have to call http_mocker._validate_all_matchers_called() to ensure that the HTTP requests that were performed considered the state as an input.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a way to obtain the streams that I saw in other tests in this same file. I'm happy to find another way to get the streams in order to make the assertions I was intending to do if the general approach of the fix included in this PR is okay.

@marianob-span1
Copy link
Author

Hi @maxi297 !

The current issue we are experiencing with low code connectors and the CDK is that, when using incremental streams with a DatetimeBasedCursor, the connector ignores the previous stream state that is passed by the platform or a --state argument if I'm working on my local dev env and it performs a full refresh instead of continue reading from the point it left off last time.

The reason seems to be that the point where a low code connector passes the current state is in the read() call in entrypoint.py:

elif cmd == "read":
    config_catalog = self.source.read_catalog(parsed_args.catalog)
    state = self.source.read_state(parsed_args.state)
    
    yield from map(
        AirbyteEntrypoint.airbyte_message_to_string,
        self.read(source_spec, config, config_catalog, state),
    )

So my changes introduced here where aimed at making sure that state passed in that read() call reaches the place where the Concurrent Cursor is instantiated, that's why I added the argument stream_state here and had to instantiate a new ConnectorStateManager at the start of ConnectorStateManager.read().

Given your question and looking at the whole CDK code, it seems that it is expected that the state is passed when the source is instantiated (not 100% sure about this though), which I'm not sure how that could be done at least for low code connectors where the source is instantiated without any parameters, this is the main.py from the new connector where I ran into this issue:

import sys

from airbyte_cdk.entrypoint import launch
from source_linear import SourceLinear

if __name__ == "__main__":
    source = SourceLinear()
    launch(source, sys.argv[1:])

It could also be the case that we are using the CDK in a way that it's not expected to be and that's why the state is blank at the moment the read operation occurs, but this happened in a low code connector recently generated using instructions found in here: https://docs.airbyte.com/connector-development/config-based/tutorial/create-source and I'm not aware of any other guides to create a low code connector. I'd love to get some guidance on how a low code connector should be structured if the issue is that there is something wrong with the connector itself

Thanks!
Mariano

@maxi297
Copy link
Contributor

maxi297 commented Mar 13, 2025

Ok, that makes a lot of sense. Thanks for sharing this context!

You can check how it is done for other low-code sources like source-instagram. Don't forget to initialize the uncaught exception handler like this. If your source is all defined within a manifest and a components.py file for the custom components, I would suggest you move to the manifest-only format like source-chargee as you would not have to worry about the instantiation and the state management.

Let me know if this does not solve your issue and why and we will figure it out! 💪

@marianob-span1
Copy link
Author

Hi @maxi297 that reply is just gold! I wasn't aware of how the run() of a connector should be properly structured.

I think for now we'll just replicate Instagram's structure given that it's easier for development purposes to just run poetry run connector read --config secrets/config.json --catalog catalog.json in a straightforward way while that using the source-declarative manifest I would have to build the connector image and then do a read operation via a docker run command, is that correct?

I'm closing the PR but will appreciate if you could reply to this last comment, thank you so much!

@maxi297
Copy link
Contributor

maxi297 commented Mar 14, 2025

it's easier for development purposes to just run poetry run connector read --config secrets/config.json --catalog catalog.json in a straightforward way while that using the source-declarative manifest I would have to build the connector image and then do a read operation via a docker run command

This is mostly true but not 100% true. Everything under the hood is Python so you could create a main.py which instantiate a YamlDeclarativeSource and you would have the same capabilities. We do that in our testing here. Note that the weird logic for the manifest path here is for CI only meaning that the testing is done as part of the source-declarative-manifest container for manifest only sources hence why we check for both paths but if it's just for a local main.py, you should be fine with just the manifest.yaml file path.

If you have any other questions, please ping me and we will figure it out! ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants