Skip to content

fix: (CDK) (Manifest) - Add deprecations support and handle deprecation warnings; deprecate url_base, path, request_body_json and request_body_data for HttpRequester #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ def run_test_read(
record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]

# get any deprecation warnings during the component creation
deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
)
datetime_format_inferrer = DatetimeFormatInferrer()

message_group = get_message_groups(
self._read_stream(source, config, configured_catalog, state),
schema_inferrer,
Expand All @@ -125,7 +130,7 @@ def run_test_read(
)

slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
message_group
message_group, deprecation_warnings
)
schema, log_messages = self._get_infered_schema(
configured_catalog, schema_inferrer, log_messages
Expand Down Expand Up @@ -238,7 +243,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:

return record_limit

def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
def _categorise_groups(
self,
message_groups: MESSAGE_GROUPS,
deprecation_warnings: Optional[List[Any]] = None,
) -> GROUPED_MESSAGES:
"""
Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.

Expand Down Expand Up @@ -269,6 +278,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
auxiliary_requests = []
latest_config_update: Optional[AirbyteControlMessage] = None

# process the message groups first
for message_group in message_groups:
match message_group:
case AirbyteLogMessage():
Expand Down Expand Up @@ -298,6 +308,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
case _:
raise ValueError(f"Unknown message group type: {type(message_group)}")

# process deprecation warnings, if present
if deprecation_warnings is not None:
for deprecation in deprecation_warnings:
match deprecation:
case AirbyteLogMessage():
log_messages.append(
LogMessage(message=deprecation.message, level=deprecation.level.value)
)
case _:
raise ValueError(f"Unknown message group type: {type(deprecation)}")

return slices, log_messages, auxiliary_requests, latest_config_update

def _get_infered_schema(
Expand Down
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import logging
from abc import abstractmethod
from typing import Any, Mapping, Tuple
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import (
AirbyteLogMessage,
)
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker

Expand Down Expand Up @@ -34,3 +37,9 @@ def check_connection(
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
"""
Returns a list of deprecation warnings for the source.
"""
return []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
Expand Down Expand Up @@ -123,6 +124,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
)

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
return self._constructor.get_model_deprecations()

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down
108 changes: 108 additions & 0 deletions airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS
# WHEN DEPRECATED FIELDS ARE ACCESSED

import warnings
from typing import Any, List

from pydantic.v1 import BaseModel

from airbyte_cdk.models import (
AirbyteLogMessage,
Level,
)

# format the warning message
warnings.formatwarning = (
lambda message, category, *args, **kwargs: f"{category.__name__}: {message}"
)

FIELDS_TAG = "__fields__"
DEPRECATED = "deprecated"
DEPRECATION_MESSAGE = "deprecation_message"
DEPRECATION_LOGS_TAG = "_deprecation_logs"


class BaseModelWithDeprecations(BaseModel):
"""
Pydantic BaseModel that warns when deprecated fields are accessed.
The deprecation message is stored in the field's extra attributes.
This class is used to create models that can have deprecated fields
and show warnings when those fields are accessed or initialized.

The `_deprecation_logs` attribute is stored in the model itself.
The collected deprecation warnings are further propagated to the Airbyte log messages,
during the component creation process, in `model_to_component._collect_model_deprecations()`.

The component implementation is not responsible for handling the deprecation warnings,
since the deprecation warnings are already handled in the model itself.
"""

class Config:
"""
Allow extra fields in the model. In case the model restricts extra fields.
"""

extra = "allow"

def __init__(self, **data: Any) -> None:
"""
Show warnings for deprecated fields during component initialization.
"""
# placeholder for deprecation logs
self._deprecation_logs: List[AirbyteLogMessage] = []

model_fields = self.__fields__
for field_name in data:
if field_name in model_fields:
is_deprecated_field = model_fields[field_name].field_info.extra.get(
DEPRECATED, False
)
if is_deprecated_field:
deprecation_message = model_fields[field_name].field_info.extra.get(
DEPRECATION_MESSAGE, ""
)
self._deprecated_warning(field_name, deprecation_message)

# Call the parent constructor
super().__init__(**data)

def __getattribute__(self, name: str) -> Any:
"""
Show warnings for deprecated fields during field usage.
"""

value = super().__getattribute__(name)
try:
model_fields = super().__getattribute__(FIELDS_TAG)
field_info = model_fields.get(name)
is_deprecated_field = (
field_info.field_info.extra.get(DEPRECATED, False) if field_info else False
)
if is_deprecated_field:
deprecation_message = field_info.field_info.extra.get(DEPRECATION_MESSAGE, "")
self._deprecated_warning(name, deprecation_message)
except (AttributeError, KeyError):
pass

return value

def _deprecated_warning(self, field_name: str, message: str) -> None:
"""
Show a warning message for deprecated fields (to stdout).
Args:
field_name (str): Name of the deprecated field.
message (str): Warning message to be displayed.
"""

message = f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}"

# Emit a warning message for deprecated fields (to stdout) (Python Default behavior)
warnings.warn(message, DeprecationWarning)

# Add the deprecation message to the Airbyte log messages,
# this logs are displayed in the Connector Builder.
self._deprecation_logs.append(
AirbyteLogMessage(level=Level.WARN, message=message),
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from isodate import parse_duration
from pydantic.v1 import BaseModel

from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.models import AirbyteLogMessage, FailureType, Level
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
Expand Down Expand Up @@ -108,6 +108,10 @@
CustomStateMigration,
GzipDecoder,
)
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
DEPRECATION_LOGS_TAG,
BaseModelWithDeprecations,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -584,6 +588,8 @@ def __init__(
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
# placeholder for deprecation warnings
self._collected_deprecation_logs: List[AirbyteLogMessage] = []

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -730,8 +736,34 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__)
if not component_constructor:
raise ValueError(f"Could not find constructor for {model.__class__}")

# collect deprecation warnings for supported models.
if isinstance(model, BaseModelWithDeprecations):
self._collect_model_deprecations(model)

return component_constructor(model=model, config=config, **kwargs)

def get_model_deprecations(self) -> List[Any]:
"""
Returns the deprecation warnings that were collected during the creation of components.
"""
return self._collected_deprecation_logs

def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None:
"""
Collects deprecation logs from the given model and appends any new logs to the internal collection.

This method checks if the provided model has deprecation logs (identified by the presence of the DEPRECATION_LOGS_TAG attribute and a non-None `_deprecation_logs` property). It iterates through each deprecation log in the model and appends it to the `_collected_deprecation_logs` list if it has not already been collected, ensuring that duplicate logs are avoided.

Args:
model (BaseModelWithDeprecations): The model instance from which to collect deprecation logs.
"""
if hasattr(model, DEPRECATION_LOGS_TAG) and model._deprecation_logs is not None:
for log in model._deprecation_logs:
# avoid duplicates for deprecation logs observed.
if log not in self._collected_deprecation_logs:
self._collected_deprecation_logs.append(log)

@staticmethod
def create_added_field_definition(
model: AddedFieldDefinitionModel, config: Config, **kwargs: Any
Expand Down
69 changes: 69 additions & 0 deletions bin/generate_component_manifest_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import re
import sys
from glob import glob
from pathlib import Path
Expand Down Expand Up @@ -28,6 +29,63 @@ def generate_init_module_content() -> str:
return header


def replace_base_model_for_classes_with_deprecated_fields(post_processed_content: str) -> str:
"""
Replace the base model for classes with deprecated fields.
This function looks for classes that inherit from `BaseModel` and have fields marked as deprecated.
It replaces the base model with `BaseModelWithDeprecations` for those classes.
"""

# Find classes with deprecated fields
classes_with_deprecated_fields = set()
class_matches = re.finditer(r"class (\w+)\(BaseModel\):", post_processed_content)

for class_match in class_matches:
class_name = class_match.group(1)
class_start = class_match.start()
# Find the next class definition or end of file
next_class_match = re.search(
r"class \w+\(",
post_processed_content[class_start + len(class_match.group(0)) :],
)
class_end = (
len(post_processed_content)
if next_class_match is None
else class_start + len(class_match.group(0)) + next_class_match.start()
)
class_content = post_processed_content[class_start:class_end]

# Check if any field has deprecated=True
if re.search(r"deprecated\s*=\s*True", class_content):
classes_with_deprecated_fields.add(class_name)

# update the imports to include the new base model with deprecation warinings
# only if there are classes with the fields marked as deprecated.
if len(classes_with_deprecated_fields) > 0:
# Find where to insert the base model - after imports but before class definitions
imports_end = post_processed_content.find(
"\n\n",
post_processed_content.find("from pydantic.v1 import"),
)
if imports_end > 0:
post_processed_content = (
post_processed_content[:imports_end]
+ "\n\n"
+ "from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (\n"
+ " BaseModelWithDeprecations,\n"
+ ")"
+ post_processed_content[imports_end:]
)

# Use the `BaseModelWithDeprecations` base model for the classes with deprecated fields
for class_name in classes_with_deprecated_fields:
pattern = rf"class {class_name}\(BaseModel\):"
replacement = f"class {class_name}(BaseModelWithDeprecations):"
post_processed_content = re.sub(pattern, replacement, post_processed_content)

return post_processed_content


async def post_process_codegen(codegen_container: dagger.Container):
codegen_container = codegen_container.with_exec(
["mkdir", "/generated_post_processed"], use_entrypoint=True
Expand All @@ -41,6 +99,11 @@ async def post_process_codegen(codegen_container: dagger.Container):
post_processed_content = original_content.replace(
" _parameters:", " parameters:"
).replace("from pydantic", "from pydantic.v1")

post_processed_content = replace_base_model_for_classes_with_deprecated_fields(
post_processed_content
)

codegen_container = codegen_container.with_new_file(
f"/generated_post_processed/{generated_file}", contents=post_processed_content
)
Expand Down Expand Up @@ -75,6 +138,12 @@ async def main():
"--set-default-enum-member",
"--use-double-quotes",
"--remove-special-field-name-prefix",
# allow usage of the extra key such as `deprecated`, etc.
"--field-extra-keys",
# account the `deprecated` flag provided for the field.
"deprecated",
# account the `deprecation_message` provided for the field.
"deprecation_message",
],
use_entrypoint=True,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import logging
import os
from typing import Literal
from typing import List, Literal
from unittest import mock
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -818,6 +818,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
connector_specification.connectionSpecification = {}
return connector_specification

def deprecation_warnings(self) -> List[AirbyteLogMessage]:
return []

@property
def check_config_against_spec(self) -> Literal[False]:
return False
Expand Down
Loading