diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 1ceecf93..8d0412b3 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -7,7 +7,6 @@ import requests import yaml -from requests import HTTPError from rich import print # noqa: A004 # Allow shadowing the built-in from airbyte import exceptions as exc @@ -19,62 +18,61 @@ from airbyte._util.telemetry import EventState, log_install_state # Non-public API from airbyte.constants import TEMP_DIR_OVERRIDE from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata +from airbyte.version import get_version if TYPE_CHECKING: from airbyte._executors.base import Executor -def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict: +VERSION_LATEST = "latest" +DEFAULT_MANIFEST_URL = ( + "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" +) + + +def _try_get_source_manifest( + source_name: str, + manifest_url: str | None, + version: str | None = None, +) -> dict: """Try to get a source manifest from a URL. - If the URL is not provided, we'll try a couple of default URLs. - We can remove/refactor this once manifests are available in GCS connector registry. + If the URL is not provided, we'll try the default URL in the Airbyte registry. + + Raises: + - `PyAirbyteInputError`: If `source_name` is `None`. + - `HTTPError`: If fetching the URL was unsuccessful. + - `YAMLError`: If parsing the YAML failed. """ - if manifest_url: - response = requests.get(url=manifest_url) - response.raise_for_status() # Raise HTTPError exception if the download failed - try: - return cast(dict, yaml.safe_load(response.text)) - except yaml.YAMLError as ex: - raise exc.AirbyteConnectorInstallationError( - message="Failed to parse the connector manifest YAML.", - connector_name=source_name, - context={ - "manifest_url": manifest_url, - }, - ) from ex + if source_name is None: + raise exc.PyAirbyteInputError( + message="Param 'source_name' is required.", + ) + + # If manifest URL was provided, we'll use the default URL from the Airbyte registry. - # No manifest URL was provided. We'll try a couple of default URLs. + cleaned_version = (version or VERSION_LATEST).removeprefix("v") + manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format( + source_name=source_name, + version=cleaned_version, + ) + response = requests.get( + url=manifest_url, + headers={"User-Agent": f"PyAirbyte/{get_version()}"}, + ) + response.raise_for_status() # Raise HTTPError exception if the download failed try: - # First try the new URL format (language='manifest-only'): - result_1 = _try_get_source_manifest( - source_name=source_name, - manifest_url=( - f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations" - f"/connectors/{source_name}/manifest.yaml" - ), - ) - except HTTPError as ex_1: - # If the new URL path was not found, try the old URL format (language='low-code'): - try: - result_2 = _try_get_source_manifest( - source_name=source_name, - manifest_url=( - f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations" - f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml" - ), - ) - except HTTPError: - # Raise the first exception, since that represents the new default URL - raise ex_1 from None - else: - # Old URL path was found (no exceptions raised). - return result_2 - else: - # New URL path was found (no exceptions raised). - return result_1 + return cast(dict, yaml.safe_load(response.text)) + except yaml.YAMLError as ex: + raise exc.AirbyteConnectorInstallationError( + message="Failed to parse the connector manifest YAML.", + connector_name=source_name, + context={ + "manifest_url": manifest_url, + }, + ) from ex def _get_local_executor( diff --git a/airbyte/sources/registry.py b/airbyte/sources/registry.py index 87f10cda..c2308a0a 100644 --- a/airbyte/sources/registry.py +++ b/airbyte/sources/registry.py @@ -235,7 +235,8 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe registry_url = _get_registry_url() if registry_url.startswith("http"): response = requests.get( - registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"} + registry_url, + headers={"User-Agent": f"PyAirbyte/{get_version()}"}, ) response.raise_for_status() data = response.json()