-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathtoken_source.py
115 lines (101 loc) · 3.61 KB
/
token_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
import abc
import typing
import time
import os
from datetime import datetime
try:
import jwt
import jwt.utils
except ImportError:
jwt = None
try:
from cryptography.hazmat.primitives.serialization import load_pem_private_key
except ImportError:
load_pem_private_key = None
class Token(abc.ABC):
def __init__(self, token: str, token_type: str):
self.token = token
self.token_type = token_type
class TokenSource(abc.ABC):
@abc.abstractmethod
def token(self) -> Token:
"""
:return: A Token object ready for exchange
"""
pass
class FixedTokenSource(TokenSource):
def __init__(self, token: str, token_type: str):
self._token = Token(token, token_type)
def token(self) -> Token:
return self._token
class JwtTokenSource(TokenSource):
def __init__(
self,
signing_method: str,
private_key: typing.Optional[str] = None,
private_key_file: typing.Optional[str] = None,
key_id: typing.Optional[str] = None,
issuer: typing.Optional[str] = None,
subject: typing.Optional[str] = None,
audience: typing.Union[typing.List[str], str, None] = None,
id: typing.Optional[str] = None,
token_ttl_seconds: int = 3600,
):
assert jwt is not None, "Install pyjwt library to use jwt tokens"
assert load_pem_private_key is not None, "Install cryptography library to use jwt tokens"
self._signing_method = signing_method
self._key_id = key_id
if private_key and private_key_file:
raise Exception("JWT: both private_key and private_key_file are set")
self._private_key = ""
if private_key:
self._private_key = private_key
if private_key_file:
private_key_file = os.path.expanduser(private_key_file)
with open(private_key_file, "rb") as key_file:
self._private_key = key_file.read()
self._issuer = issuer
self._subject = subject
self._audience = audience
self._id = id
self._token_ttl_seconds = token_ttl_seconds
if not self._signing_method:
raise Exception("JWT: no signing method specified")
if not self._private_key:
raise Exception("JWT: no private key specified")
if self._token_ttl_seconds <= 0:
raise Exception("JWT: invalid jwt token TTL")
if isinstance(self._private_key, str):
self._private_key = self._private_key.encode()
if isinstance(self._private_key, bytes) and jwt.utils.is_pem_format(self._private_key):
self._private_key = load_pem_private_key(self._private_key, password=None)
def token(self) -> Token:
now = time.time()
now_utc = datetime.utcfromtimestamp(now)
exp_utc = datetime.utcfromtimestamp(now + self._token_ttl_seconds)
payload = {
"iat": now_utc,
"exp": exp_utc,
}
if self._audience:
payload["aud"] = self._audience
if self._issuer:
payload["iss"] = self._issuer
if self._subject:
payload["sub"] = self._subject
if self._id:
payload["jti"] = self._id
headers = {
"alg": self._signing_method,
"typ": "JWT",
}
if self._key_id:
headers["kid"] = self._key_id
token = jwt.encode(
key=self._private_key,
algorithm=self._signing_method,
headers=headers,
payload=payload,
)
return Token(token, "urn:ietf:params:oauth:token-type:jwt")