Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,37 @@ def get_state_writer(
destination_name=destination_name,
)

def get_state(
stream_name: str,
) -> StreamState | None:
"""Return a stream state object for the provided stream name.

This is a thin wrapper around the internal `StateProvider` interface.
"""
return self.get_state_provider(stream_name=stream_name)
Comment on lines +283 to +290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical issues: Missing self, undefined type, and incorrect logic.

This method has several blockers preventing it from working:

  1. Missing self parameter: Line 284 needs self as the first parameter
  2. Undefined StreamState type: Need to import StreamState from the appropriate module
  3. Parameter name mismatch: Calling get_state_provider(stream_name=stream_name) but that method expects source_name, not stream_name (see lines 253-266)
  4. Return type mismatch: get_state_provider() returns StateProviderBase, not StreamState | None

The intent seems to be retrieving state for a specific stream, but get_state_provider() requires a source name and returns a provider interface, not individual stream state. Would you like to clarify the intended behavior here? Perhaps this should delegate to a method on the StateProviderBase to extract a specific stream's state? wdyt?

Here's a potential fix assuming StateProviderBase has a method to get individual stream state:

-    def get_state(
-        stream_name: str,
-    ) -> StreamState | None:
+    def get_state(
+        self,
+        stream_name: str,
+        source_name: str,
+    ) -> StreamState | None:
         """Return a stream state object for the provided stream name.
 
         This is a thin wrapper around the internal `StateProvider` interface.
         """
-        return self.get_state_provider(stream_name=stream_name)
+        state_provider = self.get_state_provider(source_name=source_name)
+        return state_provider.get_state_for_stream(stream_name)  # Assuming such a method exists

Note: You'll also need to add the import for StreamState at the top of the file.

Committable suggestion skipped: line range outside the PR's diff.


def set_state(
stream_name: str,
stream_state: StreamState | dict,
) -> None:
"""Set a stream state object for the provided stream name.

This is a thin wrapper around the internal `StateWriter` interface.
"""
...
Comment on lines +292 to +300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

❓ Verification inconclusive

Missing self and undefined type; placeholder implementation.

The method signature has the same critical issues as get_state:

  1. Missing self parameter: Line 293 needs self as the first parameter
  2. Undefined StreamState type: Need to import StreamState

Additionally, the method body is a placeholder (...). Since this is a draft PR, that might be intentional, but wanted to flag it. When implementing, you'll likely need to delegate to the StateWriter interface via get_state_writer(), perhaps something like:

def set_state(
    self,
    stream_name: str,
    stream_state: StreamState | dict,
    source_name: str,
) -> None:
    """Set a stream state object for the provided stream name.

    This is a thin wrapper around the internal `StateWriter` interface.
    """
    state_writer = self.get_state_writer(source_name=source_name)
    state_writer.write_state(stream_name, stream_state)  # Assuming such a method exists

Does that align with your intended design? wdyt?


Add missing self, import StreamState, and implement the placeholder body

  • Could you add self as the first parameter of set_state?
  • Could you import the StreamState type?
  • Could you replace ... with a call to StateWriter (e.g. self.get_state_writer(...).write_state(...))? wdyt?
🤖 Prompt for AI Agents
In airbyte/caches/base.py around lines 292 to 300, the set_state method is
missing self, the StreamState type is not imported, and the method body is a
placeholder; change the signature to include self as the first parameter, add an
import for StreamState at the top of the file, and replace the ellipsis with a
call to the StateWriter usage on this instance (e.g. obtain the writer via
self.get_state_writer(...) and call write_state(stream_name, stream_state) or
equivalent method) so the wrapper forwards the stream name and state to the
internal StateWriter.



def migrate_state(
streams: list[str] | Literal["*"],
to_cache: CacheBase,
) -> None:
"""Copies all matching stream states to the specified `Cache` object.

This is a thin wrapper around the respective `Cache` objects'
`get_state` and `set_state` methods.
"""
...
Comment on lines +302 to +312
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Missing self, undefined type, and formatting issue; placeholder implementation.

A few issues to address:

  1. Missing self parameter: Line 304 needs self as the first parameter
  2. Missing import: Literal needs to be imported from typing (it's likely already imported at line 7, but needs to be added to the import list)
  3. Extra blank line: Line 302 has too many blank lines (flagged by linter E303)
  4. Placeholder implementation: The method body is ... which is expected for a draft PR

When implementing, this could iterate over the streams and copy state from this cache to the target cache. Something like:

def migrate_state(
    self,
    streams: list[str] | Literal["*"],
    to_cache: CacheBase,
    source_name: str,
) -> None:
    """Copies all matching stream states to the specified `Cache` object.

    This is a thin wrapper around the respective `Cache` objects'
    `get_state` and `set_state` methods.
    """
    stream_list = self._catalog_backend.stream_names if streams == "*" else streams
    for stream_name in stream_list:
        state = self.get_state(stream_name, source_name)
        if state is not None:
            to_cache.set_state(stream_name, state, source_name)

Would this match your vision for the feature? wdyt?


🏁 Script executed:

#!/bin/bash
rg -nP 'from\s+typing\s+import.*\bLiteral\b' -n airbyte/caches/base.py || true
rg -nP 'import\s+typing.*\bLiteral\b' -n airbyte/caches/base.py || true

Length of output: 0


🏁 Script executed:

#!/bin/bash
sed -n '1,50p' airbyte/caches/base.py

Length of output: 1920


🏁 Script executed:

#!/bin/bash
sed -n '260,360p' airbyte/caches/base.py

Length of output: 3443


Add missing self, import Literal, and remove extra blank line

  • Include self as the first parameter in the migrate_state signature.
  • Add from typing import Literal to the import statements.
  • Remove the extra blank line immediately above migrate_state.
  • The body is currently a placeholder (...); will you implement the stream iteration and state-transfer logic here? wdyt?


def register_source(
self,
source_name: str,
Expand Down
Loading