diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 9de9e60a8..1b8ba6396 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -15,8 +15,8 @@ from typing import Any, DefaultDict, Iterable, List, Mapping, Optional from urllib.parse import urlparse +import orjson import requests -from orjson import orjson from requests import PreparedRequest, Response, Session from airbyte_cdk.connector import TConfig @@ -129,7 +129,11 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: source_spec: ConnectorSpecification = self.source.spec(self.logger) try: - with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory( + # Cleanup can fail on Windows due to file locks. Ignore if so, + # rather than failing the whole process. + ignore_cleanup_errors=True, + ) as temp_dir: os.environ[ENV_REQUEST_CACHE_PATH] = ( temp_dir # set this as default directory for request_cache to store *.sqlite files ) @@ -246,12 +250,18 @@ def handle_record_counts( ) -> AirbyteMessage: match message.type: case Type.RECORD: + if message.record is None: + raise ValueError("Record message must have a record attribute") + stream_message_count[ HashableStreamDescriptor( name=message.record.stream, namespace=message.record.namespace ) ] += 1.0 # type: ignore[union-attr] # record has `stream` and `namespace` case Type.STATE: + if message.state is None: + raise ValueError("State message must have a state attribute") + stream_descriptor = message_utils.get_stream_descriptor(message) # Set record count from the counter onto the state message diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index 894142fcb..16bce68e3 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -47,7 +47,11 @@ class StubDummyCacheHttpStream(StubDummyHttpStream): @pytest.fixture(name="enable_cache") def enable_cache_fixture(): prev_cache_path = os.environ.get(ENV_REQUEST_CACHE_PATH) - with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory( + # Cleanup can fail on Windows due to file locks. Ignore if so, + # rather than failing the whole process. + ignore_cleanup_errors=True, + ) as temp_dir: os.environ[ENV_REQUEST_CACHE_PATH] = temp_dir yield