-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_auth.py
122 lines (103 loc) · 3.62 KB
/
_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import datetime
import os
from logging import getLogger
import dotenv
import jwt
from fastapi import APIRouter, BackgroundTasks, Request
from starlette.responses import JSONResponse
logger = getLogger(__name__)
dotenv.load_dotenv()
authRoute = APIRouter(prefix='/api/auth', tags=['Auth', 'Authentication'])
async def generateJWT(payload: dict):
"""
生成 JWT Token
:return: str
"""
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(hours=12)
return jwt.encode(payload, os.getenv('SESSION_SECRET'), algorithm="HS256")
async def eventVerifier(event: str):
"""
Verifies the event received from the webhook
:param event:
:return: Boolean
"""
allowedEvent = ['PostRegister', 'PostResetPassword', 'PostSignIn']
if event in allowedEvent:
return True
return False
async def timeFrameVerifier(timeStamp: str):
"""
Verifies the time frame received from the webhook
:param timeStamp: str (time frame)
:return: Boolean
"""
try:
if (datetime.datetime.strptime(timeStamp, '%Y-%m-%dT%H:%M:%S.%fZ') >
datetime.datetime.now() - datetime.timedelta(minutes=1)):
return True
else:
return False
except ValueError:
return False
async def store_webhook_data(data: dict):
"""
:param data:
"""
pass
@authRoute.api_route('/hook', methods=['POST'])
async def logtoEventHandler(request: Request, background_tasks: BackgroundTasks):
"""
:param request:
:param background_tasks:
:return:
"""
try:
data = await request.json()
except Exception as e:
logger.debug(e)
return JSONResponse(status_code=401, content={'error': 'Invalid request'})
if not await eventVerifier(data.get('event')) or not await timeFrameVerifier(data.get('createdAt')):
return JSONResponse(status_code=401, content={'error': 'Invalid request', 'step': 2})
background_tasks.add_task(store_webhook_data, data)
return JSONResponse(status_code=200, content={'message': 'Webhook received successfully'})
@authRoute.api_route('/jwt', methods=['POST'])
async def generateJWTToken(request: Request):
"""
生成 JWT Token
:param request:
:return:
"""
try:
data = await request.json()
payload = {
"sub": data.get('sub'),
"name": data.get('name'),
"picture": data.get('picture'),
"username": data.get('username'),
"sid": data.get('sid'),
"exp": data.get('exp'),
}
token = await generateJWT(payload)
return JSONResponse(status_code=200, content={'token': token})
except KeyError:
return JSONResponse(status_code=401, content={'error': 'Invalid request'})
except Exception as e:
return JSONResponse(status_code=401, content={'error': str(e)})
@authRoute.api_route('/verify', methods=['POST'])
async def verifyJWTToken(request: Request):
"""
验证 JWT Token
:param request:
:return: JSONResponse
"""
try:
data = await request.json()
token = data.get('token')
payload = jwt.decode(token, os.getenv('SESSION_SECRET'), algorithms=["HS256"])
return JSONResponse(status_code=200, content={'payload': payload, 'header': jwt.get_unverified_header(token)})
except jwt.ExpiredSignatureError:
return JSONResponse(status_code=401, content={'error': 'Token expired'})
except jwt.InvalidTokenError:
return JSONResponse(status_code=401, content={'error': 'Invalid token'})
except Exception as e:
return JSONResponse(status_code=401, content={'error': str(e)})