Skip to content

Commit

Permalink
Feat: Use GCS public bucket to get manifest yaml for connectors; adds…
Browse files Browse the repository at this point in the history
… support for pinning versions and getting prior versions (#394)
  • Loading branch information
aaronsteers authored Sep 23, 2024
1 parent 2fae5a3 commit 9f6f8f3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 46 deletions.
88 changes: 43 additions & 45 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import requests
import yaml
from requests import HTTPError
from rich import print # noqa: A004 # Allow shadowing the built-in

from airbyte import exceptions as exc
Expand All @@ -19,62 +18,61 @@
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
from airbyte.constants import TEMP_DIR_OVERRIDE
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata
from airbyte.version import get_version


if TYPE_CHECKING:
from airbyte._executors.base import Executor


def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict:
VERSION_LATEST = "latest"
DEFAULT_MANIFEST_URL = (
"https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml"
)


def _try_get_source_manifest(
source_name: str,
manifest_url: str | None,
version: str | None = None,
) -> dict:
"""Try to get a source manifest from a URL.
If the URL is not provided, we'll try a couple of default URLs.
We can remove/refactor this once manifests are available in GCS connector registry.
If the URL is not provided, we'll try the default URL in the Airbyte registry.
Raises:
- `PyAirbyteInputError`: If `source_name` is `None`.
- `HTTPError`: If fetching the URL was unsuccessful.
- `YAMLError`: If parsing the YAML failed.
"""
if manifest_url:
response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex
if source_name is None:
raise exc.PyAirbyteInputError(
message="Param 'source_name' is required.",
)

# If manifest URL was provided, we'll use the default URL from the Airbyte registry.

# No manifest URL was provided. We'll try a couple of default URLs.
cleaned_version = (version or VERSION_LATEST).removeprefix("v")
manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format(
source_name=source_name,
version=cleaned_version,
)

response = requests.get(
url=manifest_url,
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
# First try the new URL format (language='manifest-only'):
result_1 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/manifest.yaml"
),
)
except HTTPError as ex_1:
# If the new URL path was not found, try the old URL format (language='low-code'):
try:
result_2 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml"
),
)
except HTTPError:
# Raise the first exception, since that represents the new default URL
raise ex_1 from None
else:
# Old URL path was found (no exceptions raised).
return result_2
else:
# New URL path was found (no exceptions raised).
return result_1
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex


def _get_local_executor(
Expand Down
3 changes: 2 additions & 1 deletion airbyte/sources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe
registry_url = _get_registry_url()
if registry_url.startswith("http"):
response = requests.get(
registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
registry_url,
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
)
response.raise_for_status()
data = response.json()
Expand Down

0 comments on commit 9f6f8f3

Please sign in to comment.