Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 114 additions & 7 deletions centml/cli/login.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,132 @@
import base64
import hashlib
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import secrets
import urllib.parse
import webbrowser

import click
import requests


from centml.sdk import auth
from centml.sdk.config import settings


CLIENT_ID = settings.CENTML_WORKOS_CLIENT_ID
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 57983
REDIRECT_URI = f"http://{SERVER_HOST}:{SERVER_PORT}/callback"
AUTHORIZE_URL = "https://auth.centml.com/user_management/authorize"
AUTHENTICATE_URL = "https://auth.centml.com/user_management/authenticate"
PROVIDER = "authkit"


def generate_pkce_pair():
verifier = secrets.token_urlsafe(64)
challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).decode().rstrip("=")
return verifier, challenge


def build_auth_url(client_id, redirect_uri, challenge):
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"code_challenge": challenge,
"code_challenge_method": "S256",
"provider": PROVIDER,
}
return f"{AUTHORIZE_URL}?{urllib.parse.urlencode(params)}"


class OAuthHandler(BaseHTTPRequestHandler):
def do_GET(self):
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
self.server.auth_code = params.get("code", [None])[0]

self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
"""
<html>
<body>
<h1>Succesfully logged into CentML CLI</h1>
<p>You can now close this tab and continue in the CLI.</p>
</body>
</html>
""".encode(
"utf-8"
)
)

def log_message(self, format, *args):
# Override this to suppress logging
pass


def get_auth_code():
server = HTTPServer((SERVER_HOST, SERVER_PORT), OAuthHandler)
server.handle_request()
return server.auth_code


def exchange_code_for_token(code, code_verifier):
data = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": code_verifier,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(AUTHENTICATE_URL, data=data, headers=headers, timeout=3)
response.raise_for_status()
return response.json()


@click.command(help="Login to CentML")
@click.argument("token_file", required=False)
def login(token_file):
if token_file:
auth.store_centml_cred(token_file)

if auth.load_centml_cred():
click.echo(f"Authenticating with credentials from {settings.CENTML_CRED_FILE_PATH}\n")
click.echo("Login successful")
cred = auth.load_centml_cred()
if cred is not None and auth.refresh_centml_token(cred.get("refresh_token")):
click.echo("Authenticating with stored credentials...\n")
click.echo("✅ Login successful")
else:
click.echo("Login with CentML authentication token")
click.echo("Usage: centml login TOKEN_FILE\n")
choice = click.confirm("Do you want to download the token?")
click.echo("Logging into CentML...")

choice = click.confirm("Do you want to log in with your browser now?", default=True)
if choice:
click.launch(f"{settings.CENTML_WEB_URL}?isCliAuthenticated=true")
try:
# PKCE Flow
code_verifier, code_challenge = generate_pkce_pair()
auth_url = build_auth_url(CLIENT_ID, REDIRECT_URI, code_challenge)
click.echo("A browser window will open for you to authenticate.")
click.echo("If it doesn't open automatically, you can copy and paste this URL:")
click.echo(f" {auth_url}\n")
webbrowser.open(auth_url)
click.echo("Waiting for authentication...")

code = get_auth_code()
response_dict = exchange_code_for_token(code, code_verifier)
# If there is an error, we should remove the credentials and the user needs to sign in again.
if "error" in response_dict:
click.echo("Login failed. Please try again.")
else:
cred = {
key: response_dict[key] for key in ("access_token", "refresh_token") if key in response_dict
}
with open(settings.CENTML_CRED_FILE_PATH, "w") as f:
json.dump(cred, f)
click.echo("✅ Login successful")
except Exception as e:
click.echo(f"Login failed: {e}")
else:
click.echo("Login unsuccessful")

Expand Down
44 changes: 28 additions & 16 deletions centml/sdk/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,36 @@


def refresh_centml_token(refresh_token):
api_key = settings.CENTML_FIREBASE_API_KEY

cred = requests.post(
f"https://securetoken.googleapis.com/v1/token?key={api_key}",
headers={"content-type": "application/json; charset=UTF-8"},
data=json.dumps({"grantType": "refresh_token", "refreshToken": refresh_token}),
payload = {
"client_id": settings.CENTML_WORKOS_CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}

response = requests.post(
"https://auth.centml.com/user_management/authenticate",
headers={"Content-Type": "application/json; charset=UTF-8"},
json=payload,
timeout=3,
).json()

with open(settings.CENTML_CRED_FILE_PATH, 'w') as f:
json.dump(cred, f)
)
response_dict = response.json()

# If there is an error, we should remove the credentials and the user needs to sign in again.
if "error" in response_dict:
if os.path.exists(settings.CENTML_CRED_FILE_PATH):
os.remove(settings.CENTML_CRED_FILE_PATH)
cred = None
else:
cred = {key: response_dict[key] for key in ("access_token", "refresh_token") if key in response_dict}
with open(settings.CENTML_CRED_FILE_PATH, "w") as f:
json.dump(cred, f)

return cred


def store_centml_cred(token_file):
try:
with open(token_file, 'r') as f:
with open(token_file, "r") as f:
os.makedirs(settings.CENTML_CONFIG_PATH, exist_ok=True)
refresh_token = json.load(f)["refresh_token"]

Expand All @@ -39,24 +51,24 @@ def load_centml_cred():
cred = None

if os.path.exists(settings.CENTML_CRED_FILE_PATH):
with open(settings.CENTML_CRED_FILE_PATH, 'r') as f:
with open(settings.CENTML_CRED_FILE_PATH, "r") as f:
cred = json.load(f)

return cred


def get_centml_token():
cred = load_centml_cred()

if not cred:
sys.exit("CentML credentials not found. Please login...")

exp_time = int(jwt.decode(cred["id_token"], options={"verify_signature": False})["exp"])
exp_time = int(jwt.decode(cred["access_token"], options={"verify_signature": False})["exp"])

if time.time() >= exp_time - 100:
cred = refresh_centml_token(cred["refresh_token"])
if cred is None:
sys.exit("Could not refresh credentials. Please login and try again...")

return cred["id_token"]
return cred["access_token"]


def remove_centml_cred():
Expand Down
7 changes: 2 additions & 5 deletions centml/sdk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@


class Config(BaseSettings):

# It is possible to override the default values by setting the environment variables
model_config = SettingsConfigDict(env_file=Path('.env'))
model_config = SettingsConfigDict(env_file=Path(".env"))

CENTML_WEB_URL: str = os.getenv("CENTML_WEB_URL", default="https://app.centml.com/")
CENTML_CONFIG_PATH: str = os.getenv("CENTML_CONFIG_PATH", default=os.path.expanduser("~/.centml"))
Expand All @@ -15,9 +14,7 @@ class Config(BaseSettings):

CENTML_PLATFORM_API_URL: str = os.getenv("CENTML_PLATFORM_API_URL", default="https://api.centml.com")

CENTML_FIREBASE_API_KEY: str = os.getenv(
"CENTML_FIREBASE_API_KEY", default="AIzaSyChPXy41cIAxS_Nd8oaYKyP_oKkIucobtY"
)
CENTML_WORKOS_CLIENT_ID: str = os.getenv("CENTML_WORKOS_CLIENT_ID", default="client_01JP5TWW2997MF8AYQXHJEGYR0")


settings = Config()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setup(
name='centml',
version='0.3.5',
version='0.4.0',
packages=find_packages(),
python_requires=">=3.10",
long_description=open('README.md').read(),
Expand Down
Loading