Skip to content

Commit

Permalink
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Feb 6, 2025
2 parents 1f01589 + e38f914 commit a0e5d92
Show file tree
Hide file tree
Showing 55 changed files with 2,498 additions and 1,021 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
# Create initial pending status for test report
- name: Create Pending Test Report Status
if: steps.no_changes.outputs.status != 'cancelled'
Expand Down
14 changes: 7 additions & 7 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
yield from self._process_running_partitions_and_yield_completed_ones()
self._wait_on_status_update()
except Exception as exception:
LOGGER.warning(
f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
)
if self._is_breaking_exception(exception):
LOGGER.warning(
f"Caught exception that stops the processing of the jobs: {exception}"
)
self._abort_all_running_jobs()
raise exception

Expand Down Expand Up @@ -482,16 +482,16 @@ def _is_breaking_exception(self, exception: Exception) -> bool:
and exception.failure_type == FailureType.config_error
)

def fetch_records(self, partition: AsyncPartition) -> Iterable[Mapping[str, Any]]:
def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
"""
Fetches records from the given partition's jobs.
Fetches records from the given jobs.
Args:
partition (AsyncPartition): The partition containing the jobs.
async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
Yields:
Iterable[Mapping[str, Any]]: The fetched records from the jobs.
"""
for job in partition.jobs:
for job in async_jobs:
yield from self._job_repository.fetch_records(job)
self._job_repository.delete(job)
28 changes: 17 additions & 11 deletions airbyte_cdk/sources/declarative/auth/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import base64
import json
from dataclasses import InitVar, dataclass
from datetime import datetime
from typing import Any, Mapping, Optional, Union
Expand Down Expand Up @@ -104,21 +105,21 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

def _get_jwt_headers(self) -> dict[str, Any]:
""" "
"""
Builds and returns the headers used when signing the JWT.
"""
headers = self._additional_jwt_headers.eval(self.config)
headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
raise ValueError(
"'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
)

if self._kid:
headers["kid"] = self._kid.eval(self.config)
headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
if self._typ:
headers["typ"] = self._typ.eval(self.config)
headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
if self._cty:
headers["cty"] = self._cty.eval(self.config)
headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
headers["alg"] = self._algorithm
return headers

Expand All @@ -130,18 +131,19 @@ def _get_jwt_payload(self) -> dict[str, Any]:
exp = now + self._token_duration if isinstance(self._token_duration, int) else now
nbf = now

payload = self._additional_jwt_payload.eval(self.config)
payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
raise ValueError(
"'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
)

if self._iss:
payload["iss"] = self._iss.eval(self.config)
payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
if self._sub:
payload["sub"] = self._sub.eval(self.config)
payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
if self._aud:
payload["aud"] = self._aud.eval(self.config)
payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)

payload["iat"] = now
payload["exp"] = exp
payload["nbf"] = nbf
Expand All @@ -151,7 +153,7 @@ def _get_secret_key(self) -> str:
"""
Returns the secret key used to sign the JWT.
"""
secret_key: str = self._secret_key.eval(self.config)
secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
return (
base64.b64encode(secret_key.encode()).decode()
if self._base64_encode_secret_key
Expand All @@ -176,7 +178,11 @@ def _get_header_prefix(self) -> Union[str, None]:
"""
Returns the header prefix to be used when attaching the token to the request.
"""
return self._header_prefix.eval(self.config) if self._header_prefix else None
return (
self._header_prefix.eval(self.config, json_loads=json.loads)
if self._header_prefix
else None
)

@property
def auth_header(self) -> str:
Expand Down
11 changes: 3 additions & 8 deletions airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import base64
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union
from typing import Any, Mapping, MutableMapping, Union

import requests
from cachetools import TTLCache, cached
Expand Down Expand Up @@ -45,11 +45,6 @@ class ApiKeyAuthenticator(DeclarativeAuthenticator):
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_name = InterpolatedString.create(
self.request_option.field_name, parameters=parameters
)

@property
def auth_header(self) -> str:
options = self._get_request_options(RequestOptionType.header)
Expand All @@ -60,9 +55,9 @@ def token(self) -> str:
return self.token_provider.get_token()

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
options: MutableMapping[str, Any] = {}
if self.request_option.inject_into == option_type:
options[self._field_name.eval(self.config)] = self.token
self.request_option.inject_into_request(options, self.token, self.config)
return options

def get_request_params(self) -> Mapping[str, Any]:
Expand Down
18 changes: 15 additions & 3 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
PerPartitionWithGlobalCursor,
Expand Down Expand Up @@ -231,7 +232,7 @@ def _group_streams(
):
cursor = declarative_stream.retriever.stream_slicer.stream_slicer

if not isinstance(cursor, ConcurrentCursor):
if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
# This should never happen since we instantiate ConcurrentCursor in
# model_to_component_factory.py
raise ValueError(
Expand Down Expand Up @@ -474,10 +475,21 @@ def _get_retriever(
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
# properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)

# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
# for semi-incremental streams using is_client_side_incremental to filter properly
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
):
retriever.record_selector.record_filter._cursor.set_initial_state(
stream_state=stream_state
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds

# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None
Expand Down
18 changes: 14 additions & 4 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2848,25 +2848,35 @@ definitions:
enum: [RequestPath]
RequestOption:
title: Request Option
description: Specifies the key field and where in the request a component's value should be injected.
description: Specifies the key field or path and where in the request a component's value should be injected.
type: object
required:
- type
- field_name
- inject_into
properties:
type:
type: string
enum: [RequestOption]
field_name:
title: Request Option
description: Configures which key should be used in the location that the descriptor is being injected into
title: Field Name
description: Configures which key should be used in the location that the descriptor is being injected into. We hope to eventually deprecate this field in favor of `field_path` for all request_options, but must currently maintain it for backwards compatibility in the Builder.
type: string
examples:
- segment_id
interpolation_context:
- config
- parameters
field_path:
title: Field Path
description: Configures a path to be used for nested structures in JSON body requests (e.g. GraphQL queries)
type: array
items:
type: string
examples:
- ["data", "viewer", "id"]
interpolation_context:
- config
- parameters
inject_into:
title: Inject Into
description: Configures where the descriptor should be set on the HTTP requests. Note that request parameters that are already encoded in the URL path will not be duplicated.
Expand Down
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ def read_records(
"""
:param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
"""
if stream_slice is None or stream_slice == {}:
if stream_slice is None or (
not isinstance(stream_slice, StreamSlice) and stream_slice == {}
):
# As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
# As part of the declarative model without custom components, this should never happen as the CDK would wire up a
# SinglePartitionRouter that would create this StreamSlice properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,15 @@ def _get_request_options(
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options

if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
self._partition_field_start.eval(self.config)
)
start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
self.start_time_option.inject_into_request(options, start_time_value, self.config)

if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore [union-attr]
self._partition_field_end.eval(self.config)
)
end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
self.end_time_option.inject_into_request(options, end_time_value, self.config)

return options

def should_be_synced(self, record: Record) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ def _dynamic_stream_configs(
# Ensure that each stream is created with a unique name
name = dynamic_stream.get("name")

if not isinstance(name, str):
raise ValueError(
f"Expected stream name {name} to be a string, got {type(name)}."
)

if name in seen_dynamic_streams:
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
failure_type = FailureType.system_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,11 +1200,17 @@ class InjectInto(Enum):

class RequestOption(BaseModel):
type: Literal["RequestOption"]
field_name: str = Field(
...,
description="Configures which key should be used in the location that the descriptor is being injected into",
field_name: Optional[str] = Field(
None,
description="Configures which key should be used in the location that the descriptor is being injected into. We hope to eventually deprecate this field in favor of `field_path` for all request_options, but must currently maintain it for backwards compatibility in the Builder.",
examples=["segment_id"],
title="Request Option",
title="Field Name",
)
field_path: Optional[List[str]] = Field(
None,
description="Configures a path to be used for nested structures in JSON body requests (e.g. GraphQL queries)",
examples=[["data", "viewer", "id"]],
title="Field Path",
)
inject_into: InjectInto = Field(
...,
Expand Down
Loading

0 comments on commit a0e5d92

Please sign in to comment.