1
1
import os
2
2
import json
3
- import time
4
- import httpx
5
- import jwt
6
3
from jwt .jwks_client import PyJWKClient
7
- from typing import Optional , Dict , Any , Tuple
8
4
from dotenv import load_dotenv
5
+ import requests
9
6
10
7
# Load environment variables once
11
8
load_dotenv ()
38
35
REDIS_PASSWORD = os .getenv ('REDIS_PASSWORD' , None )
39
36
REDIS_PORT = int (os .getenv ('REDIS_PORT' , 6379 ))
40
37
41
- # Create a Redis connection pool
42
- redis_pool = ConnectionPool (
43
- host = REDIS_HOST ,
44
- password = REDIS_PASSWORD ,
45
- port = REDIS_PORT ,
46
- db = 0 ,
47
- decode_responses = True ,
48
- max_connections = 10 , # Adjust based on your application's needs
49
- socket_timeout = 5.0 ,
50
- socket_connect_timeout = 1.0 ,
51
- health_check_interval = 30
52
- )
53
-
54
- # Create a Redis client that uses the connection pool
55
- redis_client = Redis (connection_pool = redis_pool )
56
-
57
- def get_redis_client ():
58
- """Get a Redis client from the connection pool"""
59
- return Redis (connection_pool = redis_pool )
60
-
38
+ default_pad = {}
39
+ with open ("templates/default.json" , 'r' ) as f :
40
+ default_pad = json .load (f )
61
41
62
42
# ===== Coder API Configuration =====
63
43
CODER_API_KEY = os .getenv ("CODER_API_KEY" )
@@ -69,119 +49,11 @@ def get_redis_client():
69
49
# Cache for JWKS client
70
50
_jwks_client = None
71
51
72
-
73
- # Session management functions
74
- def get_session (session_id : str ) -> Optional [Dict [str , Any ]]:
75
- """Get session data from Redis"""
76
- client = get_redis_client ()
77
- session_data = client .get (f"session:{ session_id } " )
78
- if session_data :
79
- return json .loads (session_data )
80
- return None
81
-
82
- def set_session (session_id : str , data : Dict [str , Any ], expiry : int ) -> None :
83
- """Store session data in Redis with expiry in seconds"""
84
- client = get_redis_client ()
85
- client .setex (
86
- f"session:{ session_id } " ,
87
- expiry ,
88
- json .dumps (data )
89
- )
90
-
91
- def delete_session (session_id : str ) -> None :
92
- """Delete session data from Redis"""
93
- client = get_redis_client ()
94
- client .delete (f"session:{ session_id } " )
95
-
96
- def get_auth_url () -> str :
97
- """Generate the authentication URL for Keycloak login"""
98
- auth_url = f"{ OIDC_SERVER_URL } /realms/{ OIDC_REALM } /protocol/openid-connect/auth"
99
- params = {
100
- 'client_id' : OIDC_CLIENT_ID ,
101
- 'response_type' : 'code' ,
102
- 'redirect_uri' : OIDC_REDIRECT_URI ,
103
- 'scope' : 'openid profile email'
104
- }
105
- return f"{ auth_url } ?{ '&' .join (f'{ k } ={ v } ' for k ,v in params .items ())} "
106
-
107
- def get_token_url () -> str :
108
- """Get the token endpoint URL"""
109
- return f"{ OIDC_SERVER_URL } /realms/{ OIDC_REALM } /protocol/openid-connect/token"
110
-
111
- def is_token_expired (token_data : Dict [str , Any ], buffer_seconds : int = 30 ) -> bool :
112
- if not token_data or 'access_token' not in token_data :
113
- return True
114
-
115
- try :
116
- # Get the signing key
117
- jwks_client = get_jwks_client ()
118
- signing_key = jwks_client .get_signing_key_from_jwt (token_data ['access_token' ])
119
-
120
- # Decode with verification
121
- decoded = jwt .decode (
122
- token_data ['access_token' ],
123
- signing_key .key ,
124
- algorithms = ["RS256" ], # Common algorithm for OIDC
125
- audience = OIDC_CLIENT_ID ,
126
- )
127
-
128
- # Check expiration
129
- exp_time = decoded .get ('exp' , 0 )
130
- current_time = time .time ()
131
- return current_time + buffer_seconds >= exp_time
132
- except jwt .ExpiredSignatureError :
133
- return True
134
- except Exception as e :
135
- print (f"Error checking token expiration: { str (e )} " )
136
- return True
137
-
138
- async def refresh_token (session_id : str , token_data : Dict [str , Any ]) -> Tuple [bool , Dict [str , Any ]]:
139
- """
140
- Refresh the access token using the refresh token
141
-
142
- Args:
143
- session_id: The session ID
144
- token_data: The current token data containing the refresh token
145
-
146
- Returns:
147
- Tuple[bool, Dict[str, Any]]: Success status and updated token data
148
- """
149
- if not token_data or 'refresh_token' not in token_data :
150
- return False , token_data
151
-
152
- try :
153
- async with httpx .AsyncClient () as client :
154
- refresh_response = await client .post (
155
- get_token_url (),
156
- data = {
157
- 'grant_type' : 'refresh_token' ,
158
- 'client_id' : OIDC_CLIENT_ID ,
159
- 'client_secret' : OIDC_CLIENT_SECRET ,
160
- 'refresh_token' : token_data ['refresh_token' ]
161
- }
162
- )
163
-
164
- if refresh_response .status_code != 200 :
165
- print (f"Token refresh failed: { refresh_response .text } " )
166
- return False , token_data
167
-
168
- # Get new token data
169
- new_token_data = refresh_response .json ()
170
-
171
- # Update session with new tokens
172
- expiry = new_token_data ['expires_in' ]
173
- set_session (session_id , new_token_data , expiry )
174
-
175
- return True , new_token_data
176
- except Exception as e :
177
- print (f"Error refreshing token: { str (e )} " )
178
- return False , token_data
179
-
180
52
def get_jwks_client ():
181
53
"""Get or create a PyJWKClient for token verification"""
182
54
global _jwks_client
183
55
if _jwks_client is None :
184
- jwks_url = f" { OIDC_SERVER_URL } /realms/ { OIDC_REALM } /protocol/openid-connect/certs"
56
+ jwks_url = OIDC_CONFIG . get ( "jwks_uri" )
185
57
_jwks_client = PyJWKClient (jwks_url )
186
58
return _jwks_client
187
59
0 commit comments