diff --git a/.coveragerc b/.coveragerc index 62ada8f6..3ca81830 100644 --- a/.coveragerc +++ b/.coveragerc @@ -13,6 +13,7 @@ exclude_lines = # exclude typing.TYPE_CHECKING if TYPE_CHECKING: + if t.TYPE_CHECKING: [html] show_contexts = True diff --git a/changelog/1246.feature.rst b/changelog/1246.feature.rst new file mode 100644 index 00000000..a56da41e --- /dev/null +++ b/changelog/1246.feature.rst @@ -0,0 +1,7 @@ +Automatically refresh short-lived PyPI token in long running Trusted Publishing +uploads. + +In the event that a trusted publishing upload job is taking longer than the +validity period of a trusted publishing token (15 minutes at the time of this +writing), *and* we are already 10 minutes into that validity period, we will +begin to attempt to replace the token on each subsequent request. diff --git a/tests/test_auth.py b/tests/test_auth.py index 1750f633..6cec8c7e 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,9 +1,13 @@ +import base64 import getpass import logging import platform import re +import time +import typing as t import pytest +import requests.auth from twine import auth from twine import exceptions @@ -299,3 +303,154 @@ def test_warns_for_empty_password( ) def test_keyring_module(): assert auth.keyring is not None + + +def test_resolver_authenticator_config_authentication(config): + config.update(username="username", password="password") + res = auth.Resolver(config, auth.CredentialInput()) + assert isinstance(res.authenticator, requests.auth.HTTPBasicAuth) + + +def test_resolver_authenticator_credential_input_authentication(config): + res = auth.Resolver(config, auth.CredentialInput("username", "password")) + assert isinstance(res.authenticator, requests.auth.HTTPBasicAuth) + + +def test_resolver_authenticator_trusted_publishing_authentication(config): + res = auth.Resolver( + config, auth.CredentialInput(username="__token__", password="skip-stdin") + ) + res._tp_token = auth.TrustedPublishingToken( + success=True, + token="fake-tp-token", + ) + assert isinstance(res.authenticator, auth.TrustedPublishingAuthenticator) + + +class MockResponse: + def __init__(self, status_code: int, json: t.Any) -> None: + self.status_code = status_code + self._json = json + + def json(self, *args, **kwargs) -> t.Any: + return self._json + + def raise_for_status(self) -> None: + if 400 <= self.status_code: + raise requests.exceptions.HTTPError() + + def ok(self) -> bool: + return self.status_code == 200 + + +class MockSession: + def __init__( + self, + get_response_list: t.List[MockResponse], + post_response_list: t.List[MockResponse], + ) -> None: + self.post_counter = self.get_counter = 0 + self.get_response_list = get_response_list + self.post_response_list = post_response_list + + def get(self, url: str, **kwargs) -> MockResponse: + response = self.get_response_list[self.get_counter] + self.get_counter += 1 + return response + + def post(self, url: str, **kwargs) -> MockResponse: + response = self.post_response_list[self.post_counter] + self.post_counter += 1 + return response + + +def test_trusted_publish_authenticator_refreshes_token(monkeypatch, config): + def make_session(): + return MockSession( + get_response_list=[ + MockResponse(status_code=200, json={"audience": "fake-aud"}) + ], + post_response_list=[ + MockResponse( + status_code=200, + json={ + "success": True, + "token": "new-token", + "expires": int(time.time()) + 900, + }, + ), + ], + ) + + def detect_credential(*args, **kwargs) -> str: + return "fake-oidc-token" + + config.update({"repository": utils.TEST_REPOSITORY}) + res = auth.Resolver(config, auth.CredentialInput(username="__token__")) + res._tp_token = auth.TrustedPublishingToken( + success=True, + token="expiring-tp-token", + ) + res._expires = int(time.time()) + 4 * 60 + monkeypatch.setattr(auth, "detect_credential", detect_credential) + monkeypatch.setattr(auth.utils, "make_requests_session", make_session) + authenticator = auth.TrustedPublishingAuthenticator(resolver=res) + prepped_req = requests.models.PreparedRequest() + prepped_req.prepare_headers({}) + request = authenticator(prepped_req) + assert ( + request.headers["Authorization"] + == f"Basic {base64.b64encode(b'__token__:new-token').decode()}" + ) + + +def test_trusted_publish_authenticator_reuses_token(monkeypatch, config): + def make_session(): + return MockSession( + get_response_list=[ + MockResponse(status_code=200, json={"audience": "fake-aud"}) + ], + post_response_list=[ + MockResponse( + status_code=200, + json={ + "success": True, + "token": "new-token", + "expires": int(time.time()) + 900, + }, + ), + ], + ) + + def detect_credential(*args, **kwargs) -> str: + return "fake-oidc-token" + + config.update({"repository": utils.TEST_REPOSITORY}) + res = auth.Resolver(config, auth.CredentialInput(username="__token__")) + res._tp_token = auth.TrustedPublishingToken( + success=True, + token="valid-tp-token", + ) + res._expires = int(time.time()) + 900 + monkeypatch.setattr(auth, "detect_credential", detect_credential) + monkeypatch.setattr(auth.utils, "make_requests_session", make_session) + authenticator = auth.TrustedPublishingAuthenticator(resolver=res) + prepped_req = requests.models.PreparedRequest() + prepped_req.prepare_headers({}) + request = authenticator(prepped_req) + assert ( + request.headers["Authorization"] + == f"Basic {base64.b64encode(b'__token__:valid-tp-token').decode()}" + ) + + +def test_inability_to_make_token_raises_error(): + class MockResolver: + def make_trusted_publishing_token(self) -> None: + return None + + authenticator = auth.TrustedPublishingAuthenticator( + resolver=MockResolver(), + ) + with pytest.raises(exceptions.TrustedPublishingFailure): + authenticator(None) diff --git a/twine/__init__.py b/twine/__init__.py index c6147574..d35db3b6 100644 --- a/twine/__init__.py +++ b/twine/__init__.py @@ -39,6 +39,7 @@ import importlib_metadata metadata = importlib_metadata.metadata("twine") +assert metadata is not None # nosec: B101 __title__ = metadata["name"] diff --git a/twine/auth.py b/twine/auth.py index 49ab8987..976921b4 100644 --- a/twine/auth.py +++ b/twine/auth.py @@ -1,16 +1,20 @@ +import datetime import functools import getpass import json import logging -from typing import TYPE_CHECKING, Callable, Optional, Type, cast +import time +import typing as t +from typing import cast from urllib.parse import urlparse +import requests.auth from id import AmbientCredentialError # type: ignore from id import detect_credential # keyring has an indirect dependency on PyCA cryptography, which has no # pre-built wheels for ppc64le and s390x, see #1158. -if TYPE_CHECKING: +if t.TYPE_CHECKING: import keyring from keyring.errors import NoKeyringError else: @@ -26,16 +30,68 @@ logger = logging.getLogger(__name__) +TOKEN_USERNAME: t.Final[str] = "__token__" +#: Tokens expire after 15 minutes, let's start allowing renewal/replacement +#: after 10 minutes that way if we fail, we may still have time to replace it +#: before it expires. Thus, if our current time + this threshold is past the +#: greater or equal to the expiration time, we should start trying to replace +#: the token. +TOKEN_RENEWAL_THRESHOLD: t.Final[datetime.timedelta] = datetime.timedelta( + minutes=5, +) + class CredentialInput: def __init__( - self, username: Optional[str] = None, password: Optional[str] = None + self, username: t.Optional[str] = None, password: t.Optional[str] = None ) -> None: self.username = username self.password = password +class TrustedPublishingTokenRetrievalError(t.TypedDict): + code: str + description: str + + +class TrustedPublishingToken(t.TypedDict, total=False): + message: t.Optional[str] + errors: t.Optional[list[TrustedPublishingTokenRetrievalError]] + token: t.Optional[str] + success: t.Optional[bool] + # Depends on https://github.com/pypi/warehouse/issues/18235 + expires: t.Optional[int] + + +class TrustedPublishingAuthenticator(requests.auth.AuthBase): + def __init__( + self, + resolver: "Resolver", + ) -> None: + self.resolver = resolver + + def __call__( + self, request: "requests.models.PreparedRequest" + ) -> "requests.models.PreparedRequest": + token = self.resolver.make_trusted_publishing_token() + if token is None: + raise exceptions.TrustedPublishingFailure( + "Expected a trusted publishing token but got None" + ) + + # Instead of reconstructing Basic Auth headers ourself, let's just + # rely on the underlying class to do the right thing. + basic_auth = requests.auth.HTTPBasicAuth( + username=TOKEN_USERNAME, + password=token, + ) + return cast(requests.models.PreparedRequest, basic_auth(request)) + + class Resolver: + _tp_token: t.Optional[TrustedPublishingToken] = None + _expires: t.Optional[int] = None + def __init__( self, config: utils.RepositoryConfig, @@ -44,16 +100,37 @@ def __init__( self.config = config self.input = input + @property + @functools.lru_cache() + def authenticator(self) -> "requests.auth.AuthBase": + username = self.username + password = self.password + if self._tp_token: + # If `self.password` ended up getting a Trusted Publishing token, + # we've cached it here so we should use that as the authenticator. + # We have a custom authenticator so we can repeatedly invoke + # `make_trusted_publishing_token` which if the token is 10 minutes + # old or more, we should get a new one automatically. + return TrustedPublishingAuthenticator(resolver=self) + if username and password: + return requests.auth.HTTPBasicAuth( + username=username, + password=password, + ) + raise exceptions.InvalidConfiguration( + "could not determine credentials for configured repository" + ) + @classmethod - def choose(cls, interactive: bool) -> Type["Resolver"]: + def choose(cls, interactive: bool) -> t.Type["Resolver"]: return cls if interactive else Private @property @functools.lru_cache() - def username(self) -> Optional[str]: + def username(self) -> t.Optional[str]: if self.is_pypi() and not self.input.username: # Default username. - self.input.username = "__token__" + self.input.username = TOKEN_USERNAME return utils.get_userpass_value( self.input.username, @@ -64,7 +141,7 @@ def username(self) -> Optional[str]: @property @functools.lru_cache() - def password(self) -> Optional[str]: + def password(self) -> t.Optional[str]: return utils.get_userpass_value( self.input.password, self.config, @@ -72,7 +149,15 @@ def password(self) -> Optional[str]: prompt_strategy=self.password_from_keyring_or_trusted_publishing_or_prompt, ) - def make_trusted_publishing_token(self) -> Optional[str]: + def _has_valid_cached_tp_token(self) -> bool: + return self._tp_token is not None and ( + int(time.time()) + TOKEN_RENEWAL_THRESHOLD.seconds + < cast(int, self._tp_token.get("expires", self._expires)) + ) + + def _make_trusted_publishing_token(self) -> t.Optional[TrustedPublishingToken]: + if self._has_valid_cached_tp_token(): + return self._tp_token # Trusted publishing (OpenID Connect): get one token from the CI # system, and exchange that for a PyPI token. repository_domain = cast(str, urlparse(self.system).netloc) @@ -97,7 +182,13 @@ def make_trusted_publishing_token(self) -> Optional[str]: if oidc_token is None: logger.debug("This environment is not supported for trusted publishing") - return None # Fall back to prompting for a token (if possible) + if self._tp_token and int(time.time()) > cast( + int, self._tp_token.get("expires", self._expires) + ): + return None # Fall back to prompting for a token (if possible) + # The cached trusted publishing token may still be valid for a + # while longer, let's continue using it instead of prompting + return self._tp_token logger.debug("Got OIDC token for audience %s", audience) @@ -121,18 +212,26 @@ def make_trusted_publishing_token(self) -> Optional[str]: for error in mint_token_payload["errors"] ) raise exceptions.TrustedPublishingFailure( - "The token request failed; the index server gave the following " - f"reasons:\n\n{reasons}" + "The token request failed; the index server gave the following" + f" reasons:\n\n{reasons}" ) logger.debug("Minted upload token for trusted publishing") + self._tp_token = cast(TrustedPublishingToken, mint_token_payload) + self._expires = int(time.time()) + 900 + return self._tp_token + + def make_trusted_publishing_token(self) -> t.Optional[str]: + mint_token_payload = self._make_trusted_publishing_token() + if not mint_token_payload: + return None return cast(str, mint_token_payload["token"]) @property - def system(self) -> Optional[str]: + def system(self) -> t.Optional[str]: return self.config["repository"] - def get_username_from_keyring(self) -> Optional[str]: + def get_username_from_keyring(self) -> t.Optional[str]: if keyring is None: logger.info("keyring module is not available") return None @@ -149,7 +248,7 @@ def get_username_from_keyring(self) -> Optional[str]: logger.warning("Error getting username from keyring", exc_info=exc) return None - def get_password_from_keyring(self) -> Optional[str]: + def get_password_from_keyring(self) -> t.Optional[str]: if keyring is None: logger.info("keyring module is not available") return None @@ -178,7 +277,7 @@ def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str: logger.info("password set from keyring") return password - if self.is_pypi() and self.username == "__token__": + if self.is_pypi() and self.username == TOKEN_USERNAME: logger.debug( "Trying to use trusted publishing (no token was explicitly provided)" ) @@ -190,7 +289,7 @@ def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str: return self.prompt(what, getpass.getpass) - def prompt(self, what: str, how: Callable[..., str]) -> str: + def prompt(self, what: str, how: t.Callable[..., str]) -> str: return how(f"Enter your {what}: ") def is_pypi(self) -> bool: @@ -204,5 +303,5 @@ def is_pypi(self) -> bool: class Private(Resolver): - def prompt(self, what: str, how: Optional[Callable[..., str]] = None) -> str: + def prompt(self, what: str, how: t.Optional[t.Callable[..., str]] = None) -> str: raise exceptions.NonInteractive(f"Credential not found for {what}.")