diff --git a/supabase_auth/_sync/client.py b/supabase_auth/_sync/client.py index 7f075765..cf579984 100644 --- a/supabase_auth/_sync/client.py +++ b/supabase_auth/_sync/client.py @@ -10,7 +10,7 @@ from ..constants import COOKIE_OPTIONS, DEFAULT_HEADERS, GOTRUE_URL, STORAGE_KEY from ..exceptions import APIError -from ..helpers import is_http_url, model_dump, model_validate +from ..helpers import is_http_url, is_valid_jwt, model_dump, model_validate from ..types import ( AuthChangeEvent, CookieOptions, @@ -364,6 +364,10 @@ def set_session(self, *, refresh_token: str) -> Session: APIError If an error occurs. """ + + if not is_valid_jwt(refresh_token): + ValueError("refresh_token must be a valid JWT authorization token") + response = self.api.refresh_access_token(refresh_token=refresh_token) self._save_session(session=response) self._notify_all_subscribers(event=AuthChangeEvent.SIGNED_IN) @@ -388,6 +392,10 @@ def set_auth(self, *, access_token: str) -> Session: APIError If an error occurs. """ + + if not is_valid_jwt(access_token): + ValueError("access_token must be a valid JWT authorization token") + session = Session( access_token=access_token, token_type="bearer", diff --git a/supabase_auth/helpers.py b/supabase_auth/helpers.py index 9808efd1..ae9dc7c5 100644 --- a/supabase_auth/helpers.py +++ b/supabase_auth/helpers.py @@ -35,6 +35,7 @@ ) TBaseModel = TypeVar("TBaseModel", bound=BaseModel) +BASE64URL_REGEX = r"^([a-z0-9_-]{4})*($|[a-z0-9_-]{3}$|[a-z0-9_-]{2}$)$" def model_validate(model: Type[TBaseModel], contents) -> TBaseModel: @@ -243,3 +244,26 @@ def parse_response_api_version(response: Response): def is_http_url(url: str) -> bool: return urlparse(url).scheme in {"https", "http"} + + +def is_valid_jwt(value: str) -> bool: + """Checks if value looks like a JWT, does not do any extra parsing.""" + if not isinstance(value, str): + return False + + # Remove trailing whitespaces if any. + value = value.strip() + + # Remove "Bearer " prefix if any. + if value.startswith("Bearer "): + value = value[7:] + + # Valid JWT must have 2 dots (Header.Paylod.Signature) + if value.count(".") != 2: + return False + + for part in value.split("."): + if not re.search(BASE64URL_REGEX, part, re.IGNORECASE): + return False + + return True