diff --git a/fastapi_jwt_auth/auth_jwt.py b/fastapi_jwt_auth/auth_jwt.py index 4110bdb..463cd24 100644 --- a/fastapi_jwt_auth/auth_jwt.py +++ b/fastapi_jwt_auth/auth_jwt.py @@ -12,7 +12,8 @@ MissingTokenError, AccessTokenRequired, RefreshTokenRequired, - FreshTokenRequired + FreshTokenRequired, + NotEnoughPermissions ) class AuthJWT(AuthConfig): @@ -36,6 +37,8 @@ def __init__(self,req: Request = None, res: Response = None): auth = req.headers.get(self._header_name.lower()) if auth: self._get_jwt_from_headers(auth) + self._required_scopes = [] + def _get_jwt_from_headers(self,auth: str) -> "AuthJWT": """ Get token from the headers @@ -635,6 +638,8 @@ def _verifying_token(self,encoded_token: str, issuer: Optional[str] = None) -> N if raw_token['type'] in self._denylist_token_checks: self._check_token_is_revoked(raw_token) + self._verifying_scopes(raw_token) + def _verified_token(self,encoded_token: str, issuer: Optional[str] = None) -> Dict[str,Union[str,int,bool]]: """ Verified token and catch all error from jwt package and return decode token @@ -668,12 +673,23 @@ def _verified_token(self,encoded_token: str, issuer: Optional[str] = None) -> Di except Exception as err: raise JWTDecodeError(status_code=422,message=str(err)) + def _verifying_scopes(self, raw_token: dict) -> None: + #decoded_token = self.get_raw_jwt(encoded_token=token) + token_scopes = raw_token["scopes"] or [] + + if len(self._required_scopes)>0: + for scope in self._required_scopes: + if scope not in token_scopes: + raise NotEnoughPermissions(status_code=401, message="Not enough permissions") + + def jwt_required( self, auth_from: str = "request", token: Optional[str] = None, websocket: Optional[WebSocket] = None, csrf_token: Optional[str] = None, + scopes: list=[] ) -> None: """ Only access token can access this function @@ -685,10 +701,13 @@ def jwt_required( :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers its must be passing csrf_token manually and can achieve by Query Url or Path """ + self._required_scopes = scopes + if auth_from == "websocket": if websocket: self._verify_and_get_jwt_in_cookies('access',websocket,csrf_token) else: self._verify_jwt_in_request(token,'access','websocket') + if auth_from == "request": if len(self._token_location) == 2: if self._token and self.jwt_in_headers: @@ -700,6 +719,8 @@ def jwt_required( self._verify_jwt_in_request(self._token,'access','headers') if self.jwt_in_cookies: self._verify_and_get_jwt_in_cookies('access',self._request) + + def jwt_optional( self, @@ -707,6 +728,7 @@ def jwt_optional( token: Optional[str] = None, websocket: Optional[WebSocket] = None, csrf_token: Optional[str] = None, + scopes: list=[] ) -> None: """ If an access token in present in the request you can get data from get_raw_jwt() or get_jwt_subject(), @@ -720,6 +742,8 @@ def jwt_optional( :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers its must be passing csrf_token manually and can achieve by Query Url or Path """ + self._required_scopes = scopes + if auth_from == "websocket": if websocket: self._verify_and_get_jwt_optional_in_cookies(websocket,csrf_token) else: self._verify_jwt_optional_in_request(token) @@ -742,6 +766,7 @@ def jwt_refresh_token_required( token: Optional[str] = None, websocket: Optional[WebSocket] = None, csrf_token: Optional[str] = None, + scopes: list=[] ) -> None: """ This function will ensure that the requester has a valid refresh token @@ -753,6 +778,8 @@ def jwt_refresh_token_required( :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers its must be passing csrf_token manually and can achieve by Query Url or Path """ + self._required_scopes = scopes + if auth_from == "websocket": if websocket: self._verify_and_get_jwt_in_cookies('refresh',websocket,csrf_token) else: self._verify_jwt_in_request(token,'refresh','websocket') @@ -775,6 +802,7 @@ def fresh_jwt_required( token: Optional[str] = None, websocket: Optional[WebSocket] = None, csrf_token: Optional[str] = None, + scopes: list=[] ) -> None: """ This function will ensure that the requester has a valid access token and fresh token @@ -786,6 +814,8 @@ def fresh_jwt_required( :param csrf_token: the CSRF double submit token. since WebSocket cannot add specifying additional headers its must be passing csrf_token manually and can achieve by Query Url or Path """ + self._required_scopes = scopes + if auth_from == "websocket": if websocket: self._verify_and_get_jwt_in_cookies('access',websocket,csrf_token,True) else: self._verify_jwt_in_request(token,'access','websocket',True) diff --git a/fastapi_jwt_auth/exceptions.py b/fastapi_jwt_auth/exceptions.py index 590423c..04d75f4 100644 --- a/fastapi_jwt_auth/exceptions.py +++ b/fastapi_jwt_auth/exceptions.py @@ -70,3 +70,12 @@ class FreshTokenRequired(AuthJWTException): def __init__(self,status_code: int, message: str): self.status_code = status_code self.message = message + +class NotEnoughPermissions(AuthJWTException): + """ + Error raised when a valid JWT attempt to access an endpoint + protected by scope requirements + """ + def __init__(self,status_code: int, message: str): + self.status_code = status_code + self.message = message \ No newline at end of file