diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index a125047b4..ea6e960c2 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -4,6 +4,7 @@ import logging +from math import log from typing import Any, ClassVar, Dict, Iterator, List, Mapping, Optional, Union from airbyte_cdk.connector_builder.models import ( @@ -112,11 +113,16 @@ def run_test_read( record_limit = self._check_record_limit(record_limit) # The connector builder currently only supports reading from a single stream at a time stream = source.streams(config)[0] + + # get any deprecation warnings during the component creation + deprecation_warnings: List[LogMessage] = source.deprecation_warnings() + schema_inferrer = SchemaInferrer( self._pk_to_nested_and_composite_field(stream.primary_key), self._cursor_field_to_nested_and_composite_field(stream.cursor_field), ) datetime_format_inferrer = DatetimeFormatInferrer() + message_group = get_message_groups( self._read_stream(source, config, configured_catalog, state), schema_inferrer, @@ -127,6 +133,10 @@ def run_test_read( slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( message_group ) + + # extend log messages with deprecation warnings + log_messages.extend(deprecation_warnings) + schema, log_messages = self._get_infered_schema( configured_catalog, schema_inferrer, log_messages ) @@ -269,6 +279,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES auxiliary_requests = [] latest_config_update: Optional[AirbyteControlMessage] = None + # process the message groups first for message_group in message_groups: match message_group: case AirbyteLogMessage(): diff --git a/airbyte_cdk/manifest_migrations/migrations/registry.yaml b/airbyte_cdk/manifest_migrations/migrations/registry.yaml index 6beb1667c..f226b0af4 100644 --- a/airbyte_cdk/manifest_migrations/migrations/registry.yaml +++ b/airbyte_cdk/manifest_migrations/migrations/registry.yaml @@ -3,7 +3,7 @@ # manifest_migrations: - - version: 6.47.1 + - version: 6.48.0 migrations: - name: http_requester_url_base_to_url order: 1 diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b5ad39f60..bce67193a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1911,15 +1911,16 @@ definitions: type: object required: - type - - url_base properties: type: type: string enum: [HttpRequester] url_base: - linkable: true + deprecated: true + deprecation_message: "Use `url` field instead." title: API Base URL - description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + linkable: true type: string interpolation_context: - config @@ -1935,9 +1936,29 @@ definitions: - "{{ config['base_url'] or 'https://app.posthog.com'}}/api" - "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups" - "https://example.com/api/v1/resource/{{ next_page_token['id'] }}" + url: + title: The URL of an API endpoint + description: The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + type: string + interpolation_context: + - config + - next_page_token + - stream_interval + - stream_partition + - stream_slice + - creation_response + - polling_response + - download_target + examples: + - "https://connect.squareup.com/v2" + - "{{ config['url'] or 'https://app.posthog.com'}}/api" + - "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups" + - "https://example.com/api/v1/resource/{{ next_page_token['id'] }}" path: + deprecated: true + deprecation_message: "Use `url` field instead." title: URL Path - description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. type: string interpolation_context: - config @@ -1983,6 +2004,8 @@ definitions: description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields. "$ref": "#/definitions/PropertiesFromEndpoint" request_body_data: + deprecated: true + deprecation_message: "Use `request_body` field instead." title: Request Body Payload (Non-JSON) description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form. anyOf: @@ -2001,6 +2024,8 @@ definitions: [{"value": {{ stream_interval['start_time'] | int * 1000 }} }] }, "orderBy": 1, "columnName": "Timestamp"}]/ request_body_json: + deprecated: true + deprecation_message: "Use `request_body` field instead." title: Request Body JSON Payload description: Specifies how to populate the body of the request with a JSON payload. Can contain nested objects. anyOf: @@ -2019,6 +2044,35 @@ definitions: - sort: field: "updated_at" order: "ascending" + request_body: + title: Request Body Payload to be send as a part of the API request. + description: Specifies how to populate the body of the request with a payload. Can contain nested objects. + anyOf: + - "$ref": "#/definitions/RequestBody" + interpolation_context: + - next_page_token + - stream_interval + - stream_partition + - stream_slice + examples: + - type: RequestBodyJson + value: + sort_order: "ASC" + sort_field: "CREATED_AT" + - type: RequestBodyJson + value: + key: "{{ config['value'] }}" + - type: RequestBodyJson + value: + sort: + field: "updated_at" + order: "ascending" + - type: RequestBodyData + value: "plain_text_body" + - type: RequestBodyData + value: + param1: "value1" + param2: "{{ config['param2_value'] }}" request_headers: title: Request Headers description: Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. @@ -4019,6 +4073,27 @@ definitions: - type - stream_template - components_resolver + RequestBody: + type: object + description: The request body payload. Can be either URL encoded data or JSON. + properties: + type: + anyOf: + - type: string + enum: [RequestBodyData] + - type: string + enum: [RequestBodyJson] + value: + anyOf: + - type: string + description: The request body payload as a string. + - type: object + description: The request body payload as a Non-JSON object (url-encoded data). + additionalProperties: + type: string + - type: object + description: The request body payload as a JSON object (json-encoded data). + additionalProperties: true interpolation: variables: - title: config @@ -4227,4 +4302,4 @@ interpolation: regex: The regular expression to search for. It must include a capture group. return_type: str examples: - - '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"' + - '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"' \ No newline at end of file diff --git a/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte_cdk/sources/declarative/declarative_source.py index 77bf427a1..2f282aee0 100644 --- a/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte_cdk/sources/declarative/declarative_source.py @@ -4,8 +4,11 @@ import logging from abc import abstractmethod -from typing import Any, Mapping, Tuple +from typing import Any, List, Mapping, Tuple +from airbyte_cdk.connector_builder.models import ( + LogMessage as ConnectorBuilderLogMessage, +) from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -34,3 +37,9 @@ def check_connection( The error object will be cast to string to display the problem to the user. """ return self.connection_checker.check_connection(self, logger, config) + + def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: + """ + Returns a list of deprecation warnings for the source. + """ + return [] diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py b/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py index f441ba918..a053551be 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py @@ -12,7 +12,7 @@ NestedMappingEntry = Union[ dict[str, "NestedMapping"], list["NestedMapping"], str, int, float, bool, None ] -NestedMapping = Union[dict[str, NestedMappingEntry], str] +NestedMapping = Union[dict[str, NestedMappingEntry], str, dict[str, Any]] @dataclass diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 03e488895..c98372be7 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -15,6 +15,9 @@ from jsonschema.validators import validate from packaging.version import InvalidVersion, Version +from airbyte_cdk.connector_builder.models import ( + LogMessage as ConnectorBuilderLogMessage, +) from airbyte_cdk.manifest_migrations.migration_handler import ( ManifestMigrationHandler, ) @@ -230,6 +233,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]: with_dynamic_stream_name=True, ) + def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: + return self._constructor.get_model_deprecations() + @property def connection_checker(self) -> ConnectionChecker: check = self._source_config["check"] diff --git a/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py b/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py new file mode 100644 index 000000000..2b5dbebc2 --- /dev/null +++ b/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py @@ -0,0 +1,144 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS +# WHEN DEPRECATED FIELDS ARE ACCESSED + +import warnings +from typing import Any, List + +from pydantic.v1 import BaseModel + +from airbyte_cdk.connector_builder.models import LogMessage as ConnectorBuilderLogMessage + +# format the warning message +warnings.formatwarning = ( + lambda message, category, *args, **kwargs: f"{category.__name__}: {message}" +) + +FIELDS_TAG = "__fields__" +DEPRECATED = "deprecated" +DEPRECATION_MESSAGE = "deprecation_message" +DEPRECATION_LOGS_TAG = "_deprecation_logs" + + +class BaseModelWithDeprecations(BaseModel): + """ + Pydantic BaseModel that warns when deprecated fields are accessed. + The deprecation message is stored in the field's extra attributes. + This class is used to create models that can have deprecated fields + and show warnings when those fields are accessed or initialized. + + The `_deprecation_logs` attribute is stored in the model itself. + The collected deprecation warnings are further propagated to the Airbyte log messages, + during the component creation process, in `model_to_component._collect_model_deprecations()`. + + The component implementation is not responsible for handling the deprecation warnings, + since the deprecation warnings are already handled in the model itself. + """ + + class Config: + """ + Allow extra fields in the model. In case the model restricts extra fields. + """ + + extra = "allow" + + def __init__(self, **model_fields: Any) -> None: + """ + Show warnings for deprecated fields during component initialization. + """ + # call the parent constructor first to initialize Pydantic internals + super().__init__(**model_fields) + # set the placeholder for the default deprecation messages + self._default_deprecation_messages: List[str] = [] + # set the placeholder for the deprecation logs + self._deprecation_logs: List[ConnectorBuilderLogMessage] = [] + # process deprecated fields, if present + self._process_fields(model_fields) + # emit default deprecation messages + self._emit_default_deprecation_messages() + # set the deprecation logs attribute to the model + self._set_deprecation_logs_attr_to_model() + + def _is_deprecated_field(self, field_name: str) -> bool: + return ( + self.__fields__[field_name].field_info.extra.get(DEPRECATED, False) + if field_name in self.__fields__.keys() + else False + ) + + def _get_deprecation_message(self, field_name: str) -> str: + return ( + self.__fields__[field_name].field_info.extra.get( + DEPRECATION_MESSAGE, "" + ) + if field_name in self.__fields__.keys() + else "" + ) + + def _process_fields(self, model_fields: Any) -> None: + """ + Processes the fields in the provided model data, checking for deprecated fields. + + For each field in the input `model_fields`, this method checks if the field exists in the model's defined fields. + If the field is marked as deprecated (using the `DEPRECATED` flag in its metadata), it triggers a deprecation warning + by calling the `_create_warning` method with the field name and an optional deprecation message. + + Args: + model_fields (Any): The data containing fields to be processed. + + Returns: + None + """ + + if hasattr(self, FIELDS_TAG): + for field_name in model_fields.keys(): + if self._is_deprecated_field(field_name): + self._create_warning( + field_name, + self._get_deprecation_message(field_name), + ) + + def _set_deprecation_logs_attr_to_model(self) -> None: + """ + Sets the deprecation logs attribute on the model instance. + + This method attaches the current instance's deprecation logs to the model by setting + an attribute named by `DEPRECATION_LOGS_TAG` to the value of `self._deprecation_logs`. + This is typically used to track or log deprecated features or configurations within the model. + + Returns: + None + """ + setattr(self, DEPRECATION_LOGS_TAG, self._deprecation_logs) + + def _create_warning(self, field_name: str, message: str) -> None: + """ + Show a warning message for deprecated fields (to stdout). + Args: + field_name (str): Name of the deprecated field. + message (str): Warning message to be displayed. + """ + + deprecated_message = f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}" + + if deprecated_message not in self._default_deprecation_messages: + # Avoid duplicates in the default deprecation messages + self._default_deprecation_messages.append(deprecated_message) + + # Create an Airbyte deprecation log message + deprecation_log_message = ConnectorBuilderLogMessage( + level="WARN", message=deprecated_message + ) + # Add the deprecation message to the Airbyte log messages, + # this logs are displayed in the Connector Builder. + if deprecation_log_message not in self._deprecation_logs: + # Avoid duplicates in the deprecation logs + self._deprecation_logs.append(deprecation_log_message) + + def _emit_default_deprecation_messages(self) -> None: + """ + Emit default deprecation messages for deprecated fields to STDOUT. + """ + for message in self._default_deprecation_messages: + warnings.warn(message, DeprecationWarning) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8337804be..095325308 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -10,6 +10,10 @@ from pydantic.v1 import BaseModel, Extra, Field +from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( + BaseModelWithDeprecations, +) + class AuthFlowType(Enum): oauth2_0 = "oauth2.0" @@ -1497,6 +1501,11 @@ class ConfigComponentsResolver(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RequestBody(BaseModel): + type: Optional[Union[Literal["RequestBodyData"], Literal["RequestBodyJson"]]] = None + value: Optional[Union[str, Dict[str, str], Dict[str, Any]]] = None + + class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -2207,11 +2216,13 @@ class SessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class HttpRequester(BaseModel): +class HttpRequester(BaseModelWithDeprecations): type: Literal["HttpRequester"] - url_base: str = Field( - ..., - description="The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + url_base: Optional[str] = Field( + None, + deprecated=True, + deprecation_message="Use `url` field instead.", + description="Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "https://connect.squareup.com/v2", "{{ config['base_url'] or 'https://app.posthog.com'}}/api", @@ -2220,9 +2231,22 @@ class HttpRequester(BaseModel): ], title="API Base URL", ) + url: Optional[str] = Field( + None, + description="The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + examples=[ + "https://connect.squareup.com/v2", + "{{ config['url'] or 'https://app.posthog.com'}}/api", + "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups", + "https://example.com/api/v1/resource/{{ next_page_token['id'] }}", + ], + title="The URL of an API endpoint", + ) path: Optional[str] = Field( None, - description="The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + deprecated=True, + deprecation_message="Use `url` field instead.", + description="Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "/products", "/quotes/{{ stream_partition['id'] }}/quote_line_groups", @@ -2261,6 +2285,8 @@ class HttpRequester(BaseModel): ) request_body_data: Optional[Union[Dict[str, str], str]] = Field( None, + deprecated=True, + deprecation_message="Use `request_body` field instead.", description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.", examples=[ '[{"clause": {"type": "timestamp", "operator": 10, "parameters":\n [{"value": {{ stream_interval[\'start_time\'] | int * 1000 }} }]\n }, "orderBy": 1, "columnName": "Timestamp"}]/\n' @@ -2269,6 +2295,8 @@ class HttpRequester(BaseModel): ) request_body_json: Optional[Union[Dict[str, Any], str]] = Field( None, + deprecated=True, + deprecation_message="Use `request_body` field instead.", description="Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.", examples=[ {"sort_order": "ASC", "sort_field": "CREATED_AT"}, @@ -2277,6 +2305,27 @@ class HttpRequester(BaseModel): ], title="Request Body JSON Payload", ) + request_body: Optional[RequestBody] = Field( + None, + description="Specifies how to populate the body of the request with a payload. Can contain nested objects.", + examples=[ + { + "type": "RequestBodyJson", + "value": {"sort_order": "ASC", "sort_field": "CREATED_AT"}, + }, + {"type": "RequestBodyJson", "value": {"key": "{{ config['value'] }}"}}, + { + "type": "RequestBodyJson", + "value": {"sort": {"field": "updated_at", "order": "ascending"}}, + }, + {"type": "RequestBodyData", "value": "plain_text_body"}, + { + "type": "RequestBodyData", + "value": {"param1": "value1", "param2": "{{ config['param2_value'] }}"}, + }, + ], + title="Request Body Payload to be send as a part of the API request.", + ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, description="Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 05e34c4c6..7368d2835 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -27,6 +27,9 @@ from isodate import parse_duration from pydantic.v1 import BaseModel +from airbyte_cdk.connector_builder.models import ( + LogMessage as ConnectorBuilderLogMessage, +) from airbyte_cdk.models import FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator @@ -107,6 +110,10 @@ from airbyte_cdk.sources.declarative.models import ( CustomStateMigration, ) +from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( + DEPRECATION_LOGS_TAG, + BaseModelWithDeprecations, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -593,6 +600,8 @@ def __init__( self._connector_state_manager = connector_state_manager or ConnectorStateManager() self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) + # placeholder for deprecation warnings + self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = [] def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -740,8 +749,34 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) if not component_constructor: raise ValueError(f"Could not find constructor for {model.__class__}") + + # collect deprecation warnings for supported models. + if isinstance(model, BaseModelWithDeprecations): + self._collect_model_deprecations(model) + return component_constructor(model=model, config=config, **kwargs) + def get_model_deprecations(self) -> List[ConnectorBuilderLogMessage]: + """ + Returns the deprecation warnings that were collected during the creation of components. + """ + return self._collected_deprecation_logs + + def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None: + """ + Collects deprecation logs from the given model and appends any new logs to the internal collection. + + This method checks if the provided model has deprecation logs (identified by the presence of the DEPRECATION_LOGS_TAG attribute and a non-None `_deprecation_logs` property). It iterates through each deprecation log in the model and appends it to the `_collected_deprecation_logs` list if it has not already been collected, ensuring that duplicate logs are avoided. + + Args: + model (BaseModelWithDeprecations): The model instance from which to collect deprecation logs. + """ + if hasattr(model, DEPRECATION_LOGS_TAG) and model._deprecation_logs is not None: + for log in model._deprecation_logs: + # avoid duplicates for deprecation logs observed. + if log not in self._collected_deprecation_logs: + self._collected_deprecation_logs.append(log) + @staticmethod def create_added_field_definition( model: AddedFieldDefinitionModel, config: Config, **kwargs: Any @@ -2180,7 +2215,7 @@ def create_http_requester( self._create_component_from_model( model=model.authenticator, config=config, - url_base=model.url_base, + url_base=model.url or model.url_base, name=name, decoder=decoder, ) @@ -2210,6 +2245,7 @@ def create_http_requester( request_parameters = model.request_parameters request_options_provider = InterpolatedRequestOptionsProvider( + request_body=model.request_body, request_body_data=model.request_body_data, request_body_json=model.request_body_json, request_headers=model.request_headers, @@ -2226,6 +2262,7 @@ def create_http_requester( return HttpRequester( name=name, + url=model.url, url_base=model.url_base, path=model.path, authenticator=authenticator, @@ -2934,6 +2971,25 @@ def create_simple_retriever( use_cache: Optional[bool] = None, **kwargs: Any, ) -> SimpleRetriever: + def _get_url() -> str: + """ + Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever. + This is needed because the URL is not set until the requester is created. + """ + + _url = ( + model.requester.url + if hasattr(model.requester, "url") and model.requester.url is not None + else requester.get_url() + ) + _url_base = ( + model.requester.url_base + if hasattr(model.requester, "url_base") and model.requester.url_base is not None + else requester.get_url_base() + ) + + return _url or _url_base + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder @@ -3001,11 +3057,6 @@ def create_simple_retriever( use_cache=use_cache, config=config, ) - url_base = ( - model.requester.url_base - if hasattr(model.requester, "url_base") - else requester.get_url_base() - ) # Define cursor only if per partition or common incremental support is needed cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None @@ -3029,7 +3080,7 @@ def create_simple_retriever( self._create_component_from_model( model=model.paginator, config=config, - url_base=url_base, + url_base=_get_url(), extractor_model=model.record_selector.extractor, decoder=decoder, cursor_used_for_stop_condition=cursor_used_for_stop_condition, diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 78c07b725..6b0e65aab 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -3,7 +3,6 @@ # import logging -import os from dataclasses import InitVar, dataclass, field from typing import Any, Callable, Mapping, MutableMapping, Optional, Union from urllib.parse import urljoin @@ -53,10 +52,11 @@ class HttpRequester(Requester): """ name: str - url_base: Union[InterpolatedString, str] config: Config parameters: InitVar[Mapping[str, Any]] + url: Optional[Union[InterpolatedString, str]] = None + url_base: Optional[Union[InterpolatedString, str]] = None path: Optional[Union[InterpolatedString, str]] = None authenticator: Optional[DeclarativeAuthenticator] = None http_method: Union[str, HttpMethod] = HttpMethod.GET @@ -71,7 +71,14 @@ class HttpRequester(Requester): decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._url_base = InterpolatedString.create(self.url_base, parameters=parameters) + self._url = InterpolatedString.create( + self.url if self.url else EmptyString, parameters=parameters + ) + # deprecated + self._url_base = InterpolatedString.create( + self.url_base if self.url_base else EmptyString, parameters=parameters + ) + # deprecated self._path = InterpolatedString.create( self.path if self.path else EmptyString, parameters=parameters ) @@ -120,6 +127,51 @@ def exit_on_rate_limit(self, value: bool) -> None: def get_authenticator(self) -> DeclarativeAuthenticator: return self._authenticator + def get_url( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> str: + interpolation_context = get_interpolation_context( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + return str(self._url.eval(self.config, **interpolation_context)) + + def _get_url( + self, + *, + path: Optional[str] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> str: + url = self.get_url( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + url_base = self.get_url_base( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + path = path or self.get_path( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + full_url = self._join_url(url_base, path) if url_base else url + path if path else url + + return full_url + def get_url_base( self, *, @@ -349,7 +401,7 @@ def _request_body_json( return options @classmethod - def _join_url(cls, url_base: str, path: str) -> str: + def _join_url(cls, url_base: str, path: Optional[str] = None) -> str: """ Joins a base URL with a given path and returns the resulting URL with any trailing slash removed. @@ -358,7 +410,7 @@ def _join_url(cls, url_base: str, path: str) -> str: Args: url_base (str): The base URL to which the path will be appended. - path (str): The path to join with the base URL. + path (Optional[str]): The path to join with the base URL. Returns: str: The resulting joined URL. @@ -399,18 +451,11 @@ def send_request( ) -> Optional[requests.Response]: request, response = self._http_client.send_request( http_method=self.get_method().value, - url=self._join_url( - self.get_url_base( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - path - or self.get_path( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), + url=self._get_url( + path=path, + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ), request_kwargs={"stream": self.stream_response}, headers=self._request_headers( diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index 2e0038730..8e64d6b94 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -6,6 +6,9 @@ from typing import Any, List, Mapping, MutableMapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RequestBody, +) from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import ( InterpolatedNestedRequestInputProvider, ) @@ -38,6 +41,7 @@ class InterpolatedRequestOptionsProvider(RequestOptionsProvider): config: Config = field(default_factory=dict) request_parameters: Optional[RequestInput] = None request_headers: Optional[RequestInput] = None + request_body: Optional[RequestBody] = None request_body_data: Optional[RequestInput] = None request_body_json: Optional[NestedMapping] = None query_properties_key: Optional[str] = None @@ -47,16 +51,19 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.request_parameters = {} if self.request_headers is None: self.request_headers = {} + # resolve the request body to either data or json + self._resolve_request_body() + # If request_body is not provided, set request_body_data and request_body_json to empty dicts if self.request_body_data is None: self.request_body_data = {} if self.request_body_json is None: self.request_body_json = {} - + # If both request_body_data and request_body_json are provided, raise an error if self.request_body_json and self.request_body_data: raise ValueError( "RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both" ) - + # set interpolators self._parameter_interpolator = InterpolatedRequestInputProvider( config=self.config, request_inputs=self.request_parameters, parameters=parameters ) @@ -70,6 +77,21 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: config=self.config, request_inputs=self.request_body_json, parameters=parameters ) + def _resolve_request_body(self) -> None: + """ + Resolves the request body configuration by setting either `request_body_data` or `request_body_json` + based on the type specified in `self.request_body`. If neither is provided, both are initialized as empty + dictionaries. Raises a ValueError if both `request_body_data` and `request_body_json` are set simultaneously. + Raises: + ValueError: If both `request_body_data` and `request_body_json` are provided. + """ + # Resolve the request body to either data or json + if self.request_body is not None and self.request_body.type is not None: + if self.request_body.type == "RequestBodyData": + self.request_body_data = self.request_body.value + elif self.request_body.type == "RequestBodyJson": + self.request_body_json = self.request_body.value + def get_request_params( self, *, diff --git a/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte_cdk/sources/declarative/requesters/requester.py index ddda1ddba..97b31e884 100644 --- a/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte_cdk/sources/declarative/requesters/requester.py @@ -34,6 +34,18 @@ def get_authenticator(self) -> DeclarativeAuthenticator: """ pass + @abstractmethod + def get_url( + self, + *, + stream_state: Optional[StreamState], + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + ) -> str: + """ + :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" + """ + @abstractmethod def get_url_base( self, diff --git a/bin/generate_component_manifest_files.py b/bin/generate_component_manifest_files.py index 43f9b568e..51b3d8efb 100755 --- a/bin/generate_component_manifest_files.py +++ b/bin/generate_component_manifest_files.py @@ -1,5 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. +import re import sys from glob import glob from pathlib import Path @@ -28,6 +29,63 @@ def generate_init_module_content() -> str: return header +def replace_base_model_for_classes_with_deprecated_fields(post_processed_content: str) -> str: + """ + Replace the base model for classes with deprecated fields. + This function looks for classes that inherit from `BaseModel` and have fields marked as deprecated. + It replaces the base model with `BaseModelWithDeprecations` for those classes. + """ + + # Find classes with deprecated fields + classes_with_deprecated_fields = set() + class_matches = re.finditer(r"class (\w+)\(BaseModel\):", post_processed_content) + + for class_match in class_matches: + class_name = class_match.group(1) + class_start = class_match.start() + # Find the next class definition or end of file + next_class_match = re.search( + r"class \w+\(", + post_processed_content[class_start + len(class_match.group(0)) :], + ) + class_end = ( + len(post_processed_content) + if next_class_match is None + else class_start + len(class_match.group(0)) + next_class_match.start() + ) + class_content = post_processed_content[class_start:class_end] + + # Check if any field has deprecated=True + if re.search(r"deprecated\s*=\s*True", class_content): + classes_with_deprecated_fields.add(class_name) + + # update the imports to include the new base model with deprecation warinings + # only if there are classes with the fields marked as deprecated. + if len(classes_with_deprecated_fields) > 0: + # Find where to insert the base model - after imports but before class definitions + imports_end = post_processed_content.find( + "\n\n", + post_processed_content.find("from pydantic.v1 import"), + ) + if imports_end > 0: + post_processed_content = ( + post_processed_content[:imports_end] + + "\n\n" + + "from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (\n" + + " BaseModelWithDeprecations,\n" + + ")" + + post_processed_content[imports_end:] + ) + + # Use the `BaseModelWithDeprecations` base model for the classes with deprecated fields + for class_name in classes_with_deprecated_fields: + pattern = rf"class {class_name}\(BaseModel\):" + replacement = f"class {class_name}(BaseModelWithDeprecations):" + post_processed_content = re.sub(pattern, replacement, post_processed_content) + + return post_processed_content + + async def post_process_codegen(codegen_container: dagger.Container): codegen_container = codegen_container.with_exec( ["mkdir", "/generated_post_processed"], use_entrypoint=True @@ -41,6 +99,11 @@ async def post_process_codegen(codegen_container: dagger.Container): post_processed_content = original_content.replace( " _parameters:", " parameters:" ).replace("from pydantic", "from pydantic.v1") + + post_processed_content = replace_base_model_for_classes_with_deprecated_fields( + post_processed_content + ) + codegen_container = codegen_container.with_new_file( f"/generated_post_processed/{generated_file}", contents=post_processed_content ) @@ -75,6 +138,12 @@ async def main(): "--set-default-enum-member", "--use-double-quotes", "--remove-special-field-name-prefix", + # allow usage of the extra key such as `deprecated`, etc. + "--field-extra-keys", + # account the `deprecated` flag provided for the field. + "deprecated", + # account the `deprecation_message` provided for the field. + "deprecation_message", ], use_entrypoint=True, ) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 36680a6bb..ba19126de 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -7,7 +7,7 @@ import json import logging import os -from typing import Literal +from typing import List, Literal from unittest import mock from unittest.mock import MagicMock, patch @@ -824,6 +824,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: connector_specification.connectionSpecification = {} return connector_specification + def deprecation_warnings(self) -> List[AirbyteLogMessage]: + return [] + @property def check_config_against_spec(self) -> Literal[False]: return False diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py index 303e31178..83e132980 100644 --- a/unit_tests/manifest_migrations/conftest.py +++ b/unit_tests/manifest_migrations/conftest.py @@ -197,7 +197,7 @@ def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: @pytest.fixture def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: return { - "version": "6.47.1", + "version": "6.48.0", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -494,13 +494,13 @@ def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: "applied_migrations": [ { "from_version": "0.0.0", - "to_version": "6.47.1", + "to_version": "6.48.0", "migration": "HttpRequesterUrlBaseToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test }, { "from_version": "0.0.0", - "to_version": "6.47.1", + "to_version": "6.48.0", "migration": "HttpRequesterPathToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test }, @@ -512,7 +512,7 @@ def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: @pytest.fixture def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: return { - "version": "6.47.1", + "version": "6.48.0", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": {}, @@ -822,7 +822,7 @@ def manifest_with_request_body_json_and_data_to_migrate_to_request_body() -> Dic @pytest.fixture def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: return { - "version": "6.47.1", + "version": "6.48.0", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["A"]}, "definitions": { @@ -1178,19 +1178,19 @@ def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: "applied_migrations": [ { "from_version": "0.0.0", - "to_version": "6.47.1", + "to_version": "6.48.0", "migration": "HttpRequesterUrlBaseToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", }, { "from_version": "0.0.0", - "to_version": "6.47.1", + "to_version": "6.48.0", "migration": "HttpRequesterPathToUrl", "migrated_at": "2025-04-01T00:00:00+00:00", }, { "from_version": "0.0.0", - "to_version": "6.47.1", + "to_version": "6.48.0", "migration": "HttpRequesterRequestBodyJsonDataToRequestBody", "migrated_at": "2025-04-01T00:00:00+00:00", }, diff --git a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index c116511a3..012e6eb32 100644 --- a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -4,6 +4,7 @@ import pytest +from airbyte_cdk.sources.declarative.models.declarative_component_schema import RequestBody from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -131,6 +132,71 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque assert actual_request_json == expected_request_json +@pytest.mark.parametrize( + "test_name, input_request_json, expected_request_json", + [ + ( + "test_static_json", + {"a_static_request_param": "a_static_value"}, + {"a_static_request_param": "a_static_value"}, + ), + ( + "test_value_depends_on_stream_slice", + {"read_from_slice": "{{ stream_slice['start_date'] }}"}, + {"read_from_slice": "2020-01-01"}, + ), + ( + "test_value_depends_on_next_page_token", + {"read_from_token": "{{ next_page_token['offset'] }}"}, + {"read_from_token": 12345}, + ), + ( + "test_value_depends_on_config", + {"read_from_config": "{{ config['option'] }}"}, + {"read_from_config": "OPTION"}, + ), + ( + "test_interpolated_keys", + {"{{ stream_interval['start_date'] }}": 123, "{{ config['option'] }}": "ABC"}, + {"2020-01-01": 123, "OPTION": "ABC"}, + ), + ("test_boolean_false_value", {"boolean_false": "{{ False }}"}, {"boolean_false": False}), + ("test_integer_falsy_value", {"integer_falsy": "{{ 0 }}"}, {"integer_falsy": 0}), + ("test_number_falsy_value", {"number_falsy": "{{ 0.0 }}"}, {"number_falsy": 0.0}), + ("test_string_falsy_value", {"string_falsy": "{{ '' }}"}, {}), + ("test_none_value", {"none_value": "{{ None }}"}, {}), + ( + "test_string", + """{"nested": { "key": "{{ config['option'] }}" }}""", + {"nested": {"key": "OPTION"}}, + ), + ( + "test_nested_objects", + {"nested": {"key": "{{ config['option'] }}"}}, + {"nested": {"key": "OPTION"}}, + ), + ( + "test_nested_objects_interpolated keys", + {"nested": {"{{ stream_interval['start_date'] }}": "{{ config['option'] }}"}}, + {"nested": {"2020-01-01": "OPTION"}}, + ), + ], +) +def test_interpolated_request_json_using_request_body( + test_name, input_request_json, expected_request_json +): + provider = InterpolatedRequestOptionsProvider( + config=config, + request_body=RequestBody(type="RequestBodyJson", value=input_request_json), + parameters={}, + ) + actual_request_json = provider.get_request_body_json( + stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token + ) + + assert actual_request_json == expected_request_json + + @pytest.mark.parametrize( "test_name, input_request_data, expected_request_data", [ @@ -169,6 +235,48 @@ def test_interpolated_request_data(test_name, input_request_data, expected_reque assert actual_request_data == expected_request_data +@pytest.mark.parametrize( + "test_name, input_request_data, expected_request_data", + [ + ( + "test_static_map_data", + {"a_static_request_param": "a_static_value"}, + {"a_static_request_param": "a_static_value"}, + ), + ( + "test_map_depends_on_stream_slice", + {"read_from_slice": "{{ stream_slice['start_date'] }}"}, + {"read_from_slice": "2020-01-01"}, + ), + ( + "test_map_depends_on_config", + {"read_from_config": "{{ config['option'] }}"}, + {"read_from_config": "OPTION"}, + ), + ("test_defaults_to_empty_dict", None, {}), + ( + "test_interpolated_keys", + {"{{ stream_interval['start_date'] }} - {{ next_page_token['offset'] }}": "ABC"}, + {"2020-01-01 - 12345": "ABC"}, + ), + ], +) +def test_interpolated_request_data_using_request_body( + test_name, input_request_data, expected_request_data +): + provider = InterpolatedRequestOptionsProvider( + config=config, + request_body=RequestBody(type="RequestBodyData", value=input_request_data), + parameters={}, + ) + + actual_request_data = provider.get_request_body_data( + stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token + ) + + assert actual_request_data == expected_request_data + + def test_error_on_create_for_both_request_json_and_data(): request_json = {"body_key": "{{ stream_slice['start_date'] }}"} request_data = "interpolate_me=5&invalid={{ config['option'] }}"