Skip to content

fix: (CDK) (Manifest) - Add deprecations support and handle deprecation warnings; deprecate url_base, path, request_body_json and request_body_data for HttpRequester #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import logging
from math import log
from typing import Any, ClassVar, Dict, Iterator, List, Mapping, Optional, Union

from airbyte_cdk.connector_builder.models import (
Expand Down Expand Up @@ -112,11 +113,16 @@ def run_test_read(
record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]

# get any deprecation warnings during the component creation
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
)
datetime_format_inferrer = DatetimeFormatInferrer()

message_group = get_message_groups(
self._read_stream(source, config, configured_catalog, state),
schema_inferrer,
Expand All @@ -127,6 +133,10 @@ def run_test_read(
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
message_group
)

# extend log messages with deprecation warnings
log_messages.extend(deprecation_warnings)

schema, log_messages = self._get_infered_schema(
configured_catalog, schema_inferrer, log_messages
)
Expand Down Expand Up @@ -269,6 +279,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
auxiliary_requests = []
latest_config_update: Optional[AirbyteControlMessage] = None

# process the message groups first
for message_group in message_groups:
match message_group:
case AirbyteLogMessage():
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/manifest_migrations/migrations/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

manifest_migrations:
- version: 6.47.1
- version: 6.48.0
migrations:
- name: http_requester_url_base_to_url
order: 1
Expand Down
85 changes: 80 additions & 5 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1911,15 +1911,16 @@ definitions:
type: object
required:
- type
- url_base
properties:
type:
type: string
enum: [HttpRequester]
url_base:
linkable: true
deprecated: true
deprecation_message: "Use `url` field instead."
title: API Base URL
description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
linkable: true
type: string
interpolation_context:
- config
Expand All @@ -1935,9 +1936,29 @@ definitions:
- "{{ config['base_url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
url:
title: The URL of an API endpoint
description: The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
- next_page_token
- stream_interval
- stream_partition
- stream_slice
- creation_response
- polling_response
- download_target
examples:
- "https://connect.squareup.com/v2"
- "{{ config['url'] or 'https://app.posthog.com'}}/api"
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
path:
deprecated: true
deprecation_message: "Use `url` field instead."
title: URL Path
description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
Expand Down Expand Up @@ -1983,6 +2004,8 @@ definitions:
description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.
"$ref": "#/definitions/PropertiesFromEndpoint"
request_body_data:
deprecated: true
deprecation_message: "Use `request_body` field instead."
title: Request Body Payload (Non-JSON)
description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.
anyOf:
Expand All @@ -2001,6 +2024,8 @@ definitions:
[{"value": {{ stream_interval['start_time'] | int * 1000 }} }]
}, "orderBy": 1, "columnName": "Timestamp"}]/
request_body_json:
deprecated: true
deprecation_message: "Use `request_body` field instead."
title: Request Body JSON Payload
description: Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.
anyOf:
Expand All @@ -2019,6 +2044,35 @@ definitions:
- sort:
field: "updated_at"
order: "ascending"
request_body:
title: Request Body Payload to be send as a part of the API request.
description: Specifies how to populate the body of the request with a payload. Can contain nested objects.
anyOf:
- "$ref": "#/definitions/RequestBody"
interpolation_context:
- next_page_token
- stream_interval
- stream_partition
- stream_slice
examples:
- type: RequestBodyJson
value:
sort_order: "ASC"
sort_field: "CREATED_AT"
- type: RequestBodyJson
value:
key: "{{ config['value'] }}"
- type: RequestBodyJson
value:
sort:
field: "updated_at"
order: "ascending"
- type: RequestBodyData
value: "plain_text_body"
- type: RequestBodyData
value:
param1: "value1"
param2: "{{ config['param2_value'] }}"
request_headers:
title: Request Headers
description: Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
Expand Down Expand Up @@ -4019,6 +4073,27 @@ definitions:
- type
- stream_template
- components_resolver
RequestBody:
type: object
description: The request body payload. Can be either URL encoded data or JSON.
properties:
type:
anyOf:
- type: string
enum: [RequestBodyData]
- type: string
enum: [RequestBodyJson]
value:
anyOf:
- type: string
description: The request body payload as a string.
- type: object
description: The request body payload as a Non-JSON object (url-encoded data).
additionalProperties:
type: string
- type: object
description: The request body payload as a JSON object (json-encoded data).
additionalProperties: true
interpolation:
variables:
- title: config
Expand Down Expand Up @@ -4227,4 +4302,4 @@ interpolation:
regex: The regular expression to search for. It must include a capture group.
return_type: str
examples:
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import logging
from abc import abstractmethod
from typing import Any, Mapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker

Expand Down Expand Up @@ -34,3 +37,9 @@ def check_connection(
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
"""
Returns a list of deprecation warnings for the source.
"""
return []
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
NestedMappingEntry = Union[
dict[str, "NestedMapping"], list["NestedMapping"], str, int, float, bool, None
]
NestedMapping = Union[dict[str, NestedMappingEntry], str]
NestedMapping = Union[dict[str, NestedMappingEntry], str, dict[str, Any]]


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from jsonschema.validators import validate
from packaging.version import InvalidVersion, Version

from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
from airbyte_cdk.manifest_migrations.migration_handler import (
ManifestMigrationHandler,
)
Expand Down Expand Up @@ -230,6 +233,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
with_dynamic_stream_name=True,
)

def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
return self._constructor.get_model_deprecations()

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down
144 changes: 144 additions & 0 deletions airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS
# WHEN DEPRECATED FIELDS ARE ACCESSED

import warnings
from typing import Any, List

from pydantic.v1 import BaseModel

from airbyte_cdk.connector_builder.models import LogMessage as ConnectorBuilderLogMessage

# format the warning message
warnings.formatwarning = (
lambda message, category, *args, **kwargs: f"{category.__name__}: {message}"
)

FIELDS_TAG = "__fields__"
DEPRECATED = "deprecated"
DEPRECATION_MESSAGE = "deprecation_message"
DEPRECATION_LOGS_TAG = "_deprecation_logs"


class BaseModelWithDeprecations(BaseModel):
"""
Pydantic BaseModel that warns when deprecated fields are accessed.
The deprecation message is stored in the field's extra attributes.
This class is used to create models that can have deprecated fields
and show warnings when those fields are accessed or initialized.

The `_deprecation_logs` attribute is stored in the model itself.
The collected deprecation warnings are further propagated to the Airbyte log messages,
during the component creation process, in `model_to_component._collect_model_deprecations()`.

The component implementation is not responsible for handling the deprecation warnings,
since the deprecation warnings are already handled in the model itself.
"""

class Config:
"""
Allow extra fields in the model. In case the model restricts extra fields.
"""

extra = "allow"

def __init__(self, **model_fields: Any) -> None:
"""
Show warnings for deprecated fields during component initialization.
"""
# call the parent constructor first to initialize Pydantic internals
super().__init__(**model_fields)
# set the placeholder for the default deprecation messages
self._default_deprecation_messages: List[str] = []
# set the placeholder for the deprecation logs
self._deprecation_logs: List[ConnectorBuilderLogMessage] = []
# process deprecated fields, if present
self._process_fields(model_fields)
# emit default deprecation messages
self._emit_default_deprecation_messages()
# set the deprecation logs attribute to the model
self._set_deprecation_logs_attr_to_model()

def _is_deprecated_field(self, field_name: str) -> bool:
return (
self.__fields__[field_name].field_info.extra.get(DEPRECATED, False)
if field_name in self.__fields__.keys()
else False
)

def _get_deprecation_message(self, field_name: str) -> str:
return (
self.__fields__[field_name].field_info.extra.get(
DEPRECATION_MESSAGE, "<missing_deprecation_message>"
)
if field_name in self.__fields__.keys()
else "<missing_deprecation_message>"
)

def _process_fields(self, model_fields: Any) -> None:
"""
Processes the fields in the provided model data, checking for deprecated fields.

For each field in the input `model_fields`, this method checks if the field exists in the model's defined fields.
If the field is marked as deprecated (using the `DEPRECATED` flag in its metadata), it triggers a deprecation warning
by calling the `_create_warning` method with the field name and an optional deprecation message.

Args:
model_fields (Any): The data containing fields to be processed.

Returns:
None
"""

if hasattr(self, FIELDS_TAG):
for field_name in model_fields.keys():
if self._is_deprecated_field(field_name):
self._create_warning(
field_name,
self._get_deprecation_message(field_name),
)

def _set_deprecation_logs_attr_to_model(self) -> None:
"""
Sets the deprecation logs attribute on the model instance.

This method attaches the current instance's deprecation logs to the model by setting
an attribute named by `DEPRECATION_LOGS_TAG` to the value of `self._deprecation_logs`.
This is typically used to track or log deprecated features or configurations within the model.

Returns:
None
"""
setattr(self, DEPRECATION_LOGS_TAG, self._deprecation_logs)

def _create_warning(self, field_name: str, message: str) -> None:
"""
Show a warning message for deprecated fields (to stdout).
Args:
field_name (str): Name of the deprecated field.
message (str): Warning message to be displayed.
"""

deprecated_message = f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}"

if deprecated_message not in self._default_deprecation_messages:
# Avoid duplicates in the default deprecation messages
self._default_deprecation_messages.append(deprecated_message)

# Create an Airbyte deprecation log message
deprecation_log_message = ConnectorBuilderLogMessage(
level="WARN", message=deprecated_message
)
# Add the deprecation message to the Airbyte log messages,
# this logs are displayed in the Connector Builder.
if deprecation_log_message not in self._deprecation_logs:
# Avoid duplicates in the deprecation logs
self._deprecation_logs.append(deprecation_log_message)

def _emit_default_deprecation_messages(self) -> None:
"""
Emit default deprecation messages for deprecated fields to STDOUT.
"""
for message in self._default_deprecation_messages:
warnings.warn(message, DeprecationWarning)
Loading
Loading