diff --git a/.github/workflows/check-publish.yml b/.github/workflows/check-publish.yml index 0ffd9f3..f4dc7bd 100644 --- a/.github/workflows/check-publish.yml +++ b/.github/workflows/check-publish.yml @@ -29,7 +29,7 @@ jobs: fail-fast: false matrix: platform: ["ubuntu-latest", "macos-13"] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11", "3.12"] name: Python ${{ matrix.python-version }} on ${{ matrix.platform }} runs-on: ${{ matrix.platform }} diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index 29d11d9..9745564 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -25,7 +25,7 @@ jobs: fail-fast: false matrix: platform: ["ubuntu-latest","macos-13"] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11", "3.12"] name: Python ${{ matrix.python-version }} on ${{ matrix.platform }} runs-on: ${{ matrix.platform }} diff --git a/aviso-server/auth/aviso_auth/__init__.py b/aviso-server/auth/aviso_auth/__init__.py index e58107a..516a1e4 100644 --- a/aviso-server/auth/aviso_auth/__init__.py +++ b/aviso-server/auth/aviso_auth/__init__.py @@ -10,7 +10,7 @@ # version number for the application. -__version__ = "0.4.0" +__version__ = "0.5.0" # setting application logger logger = logging.getLogger("aviso-auth") diff --git a/aviso-server/auth/aviso_auth/authentication.py b/aviso-server/auth/aviso_auth/authentication.py index 73369cc..46e6f90 100644 --- a/aviso-server/auth/aviso_auth/authentication.py +++ b/aviso-server/auth/aviso_auth/authentication.py @@ -6,161 +6,303 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. +import logging import random import time +import jwt import requests +from aviso_auth.custom_exceptions import InternalSystemError, TokenNotValidException from aviso_monitoring.collector.time_collector import TimeCollector from aviso_monitoring.reporter.aviso_auth_reporter import AvisoAuthMetricType -from . import logger -from .custom_exceptions import ( - AuthenticationUnavailableException, - InternalSystemError, - TokenNotValidException, -) - +logger = logging.getLogger("aviso-auth") MAX_N_TRIES = 25 class Authenticator: - UNAUTHORISED_RESPONSE_HEADER = { - "WWW-Authenticate": "EmailKey realm='ecmwf',info='Authenticate with ECMWF API credentials :'" - } + # Following is the fallback response header in case of an unauthorized response + # from the authentication server does not provide a WWW-Authenticate header. + UNAUTHORISED_RESPONSE_HEADER = {"WWW-Authenticate": 'Bearer realm="auth-o-tron"'} def __init__(self, config, cache=None): - self.url = config.authentication_server["url"] - self.req_timeout = config.authentication_server["req_timeout"] + """ + Initialize the Authenticator. + - Loads the authentication server URL and request timeout from the configuration. + - If a cache is provided, token validation is memoized to avoid repeated calls. + - If monitoring is enabled (authentication_server["monitor"] is True), + wraps the authenticate method with a TimeCollector. + """ + logger.debug("Initializing Authenticator with config: %s", config) + self.config = config + self.url = config.authentication_server.get("url", "") + self.req_timeout = config.authentication_server.get("req_timeout", 10) + logger.debug("Authentication server URL: %s, timeout: %s", self.url, self.req_timeout) - # assign explicitly a decorator to provide cache for _token_to_username - if cache: - self._token_to_username = cache.memoize(timeout=config.authentication_server["cache_timeout"])( - self._token_to_username_impl - ) - else: - self._token_to_username = self._token_to_username_impl + self.cache = cache - # assign explicitly a decorator to monitor the authentication - if config.authentication_server["monitor"]: + # Setup monitoring if enabled. + if config.authentication_server.get("monitor"): self.timer = TimeCollector( config.monitoring, tlm_type=AvisoAuthMetricType.auth_resp_time.name, tlm_name="att" ) + logger.debug("Monitoring enabled; using timed_authenticate") self.authenticate = self.timed_authenticate else: self.authenticate = self.authenticate_impl + # Wrap the token validation function with caching if available. + if self.cache: + logger.debug( + "Using memoized token validator with cache timeout = %s", + config.authentication_server.get("cache_timeout", 300), + ) + self.validate_token_cached = self.cache.memoize( + timeout=config.authentication_server.get("cache_timeout", 300) + )(self._validate_token_uncached) + else: + logger.debug("No cache provided; using uncached token validation") + self.validate_token_cached = self._validate_token_uncached + def timed_authenticate(self, request): """ - This method is an explicit decorator of the authenticate_impl method to provide time performance monitoring + Wraps the authenticate_impl method with a TimeCollector. """ - return self.timer(self.authenticate_impl, args=request) + logger.debug("timed_authenticate: Starting timed authentication") + return self.timer(self.authenticate_impl, args=(request,)) def authenticate_impl(self, request): """ - This method verifies the token in the request header corresponds to a valid user - :param request: - :return: - - the username if token is valid - - TokenNotValidException if the server returns 403 - - InternalSystemError for all the other cases + Main authentication flow: + 1. Extract the Authorization and X-Auth-Type headers. + 2. Extract the token from the Authorization header. + 3. Gather the client IP (for logging). + 4. Validate the token (cached). + 5. Decode the JWT from the validation response to extract username and realm. + 6. Return the username. """ - if request.environ is None or request.environ.get("HTTP_AUTHORIZATION") is None: - logger.debug(f"Authorization header absent {request.environ}") - raise TokenNotValidException("Authorization header not found") + logger.debug("authenticate_impl: Starting authentication process") - # validate the authorization header - auth_header = request.environ.get("HTTP_AUTHORIZATION") - try: - auth_type, credentials = auth_header.split(" ", 1) - auth_email, auth_token = credentials.split(":", 1) - except ValueError: - logger.debug(f"Authorization header not recognised {auth_header}") - raise TokenNotValidException("Could not read authorization header, expected 'Authorization: :'") + # Step 1: Extract headers. + auth_header, x_auth_type = self.extract_auth_headers(request) + + # Step 2: Extract the token from the Authorization header. + token = self.extract_token(auth_header, x_auth_type) - # validate the token - username, email = self._token_to_username(auth_token) + # Step 3: Get the client IP. + client_ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown" - # validate the email - if auth_email.casefold() != email.casefold(): - logger.debug(f"Emails not matching {auth_email.casefold()}, {email.casefold()}") - raise TokenNotValidException("Invalid email associate to the token.") + # Step 4: Validate the token (cached). + resp = self.validate_token_cached(token, x_auth_type, client_ip=client_ip) - logger.info(f"User {username} correctly authenticated, client IP: {request.headers.get('X-Forwarded-For')}") + # Step 5: Decode the JWT to extract user information. + username, realm = self._token_to_username_impl(resp) + + logger.debug("authenticate_impl: Returning username: %s", username) return username - def _token_to_username_impl(self, token): + def extract_auth_headers(self, request): """ - This method verifies the token corresponds to a valid user. - Access this method by self._token_to_username - :param token: - :return: - - the username and email if token is valid - - InternalSystemError for all the other cases + Extracts the HTTP_AUTHORIZATION header from request.environ and the custom X-Auth-Type header. + If Authorization header uses EmailKey scheme, X-Auth-Type is assumed to be "ecmwf" even if missing. """ - logger.debug(f"Request authentication for token {token}") + if not hasattr(request, "environ"): + logger.error("Request missing environ attribute") + raise TokenNotValidException("Invalid request: no environ attribute") - resp = self.wait_for_resp(token) + auth_header = request.environ.get("HTTP_AUTHORIZATION") + if not auth_header: + logger.error("Missing Authorization header") + raise TokenNotValidException("Missing Authorization header") + logger.debug("Extracted Authorization header: %s", auth_header) - # just in case requests does not always raise an error - if resp.status_code != 200: - message = ( - f"Not able to authenticate token {token} to {self.url}, status {resp.status_code}, " - f"{resp.reason}, {resp.content.decode()}" - ) - logger.error(message) - raise InternalSystemError(f"Error in authenticating token {token}, please contact the support team") + # Check if this is an EmailKey authorization before requiring X-Auth-Type + if auth_header.lower().startswith("emailkey "): + # For EmailKey, assume X-Auth-Type is "ecmwf" if not provided + x_auth_type = request.headers.get("X-Auth-Type", "ecmwf") + logger.debug("EmailKey detected: Using X-Auth-Type '%s'", x_auth_type) + else: + # For other auth schemes, X-Auth-Type is required + x_auth_type = request.headers.get("X-Auth-Type") + if not x_auth_type: + logger.error("Missing X-Auth-Type header") + raise TokenNotValidException("Missing X-Auth-Type header") + logger.debug("Extracted X-Auth-Type header: %s", x_auth_type) + + return auth_header, x_auth_type - # we got a 200, extract the username and email - resp_body = resp.json() - if resp_body.get("uid") is None: - logger.error(f"Not able to find username in: {resp_body}") - raise InternalSystemError(f"Error in authenticating token {token}, please contact the support team") - # get the username - username = resp_body.get("uid") + def extract_token(self, auth_header, x_auth_type): + """ + Parses the Authorization header to extract the token. + For "plain" auth, expects a Basic scheme; for all other auth types, expects Bearer. + Legacy clients sending "EmailKey" are automatically mapped to "Bearer" and + X-Auth-Type is assumed to be "ecmwf". + """ + try: + scheme, token = auth_header.split(" ", 1) + except Exception as e: + logger.error("Failed to parse Authorization header: %s", e, exc_info=True) + raise TokenNotValidException("Invalid Authorization header format") - if resp_body.get("email") is None: - logger.error(f"Not able to find email in: {resp_body}") - raise InternalSystemError(f"Error in authenticating token {token}, please contact the support team") - email = resp_body.get("email") + # Map legacy "EmailKey" scheme to "Bearer" and ensure X-Auth-Type is "ecmwf" + if scheme.lower() == "emailkey": + logger.debug("Mapping legacy 'EmailKey' scheme to 'Bearer'") + scheme = "Bearer" + token = token.split(":")[-1].strip() # Extract the token part + # Ensure X-Auth-Type is "ecmwf" when EmailKey is used + if x_auth_type.lower() != "ecmwf": + logger.debug("EmailKey detected but X-Auth-Type is '%s', overriding to 'ecmwf'", x_auth_type) + x_auth_type = "ecmwf" - logger.debug(f"Token correctly validated for user {username}, email {email}") - return username, email + expected_scheme = "basic" if x_auth_type.lower() == "plain" else "bearer" + if scheme.lower() != expected_scheme: + logger.error("Expected '%s' scheme, got: %s", expected_scheme.capitalize(), scheme) + raise TokenNotValidException("Unsupported authorization scheme") + logger.debug("extract_token: Extracted token: %s", token) + return token - def wait_for_resp(self, token): + def _validate_token_uncached(self, token, x_auth_type, client_ip="unknown"): """ - This methods helps in cases of 429, too many requests at the same time, by spacing in time the requests - :param token: - :return: response to token validation - - TokenNotValidException if the server returns 403 - - AuthenticationUnavailableException if unreachable + Implements the actual token validation by calling auth-o-tron /authenticate. + For "ecmwf" and "openid", a Bearer header is used. + For "plain", a Basic header is used. + Retries on temporary errors. """ + if x_auth_type.lower() in ["ecmwf", "openid"]: + auth_header = f"Bearer {token}" + elif x_auth_type.lower() == "plain": + auth_header = f"Basic {token}" + else: + logger.warning("Unknown auth type: %s", x_auth_type) + raise TokenNotValidException(f"Unknown auth type: {x_auth_type}") + + headers = {"Authorization": auth_header} + auth_url = f"{self.url}/authenticate" + logger.debug( + "Calling auth-o-tron /authenticate at %s [auth_type=%s, client_ip=%s]", auth_url, x_auth_type, client_ip + ) + n_tries = 0 while n_tries < MAX_N_TRIES: + logger.debug("validate_token: Attempt %d [auth_type=%s, ip=%s]", n_tries + 1, x_auth_type, client_ip) try: - resp = requests.get(self.url, headers={"X-ECMWF-Key": token}, timeout=self.req_timeout) - if resp.status_code == 429: # Too many request just retry in a bit + resp = requests.get(auth_url, headers=headers, timeout=self.req_timeout) + logger.debug("validate_token: Received response with status %d", resp.status_code) + if resp.status_code == 429: + logger.debug("validate_token: Rate limited (429), sleeping") time.sleep(random.uniform(1, 5)) n_tries += 1 + continue + + resp.raise_for_status() # Raises HTTPError for 4xx/5xx statuses + logger.debug( + "validate_token: Token validated successfully [auth_type=%s, ip=%s]", x_auth_type, client_ip + ) + return resp + + except requests.exceptions.HTTPError: + status_code = resp.status_code + # Use dynamic www-authenticate from the response header. + www_authenticate = resp.headers.get("www-authenticate", "Not provided") + if status_code in [401, 403]: + logger.warning( + "validate_token: %d Unauthorized [auth_type=%s, ip=%s, www-authenticate=%s, reason=%.200s]", + status_code, + x_auth_type, + client_ip, + www_authenticate, + resp.text, + ) + raise TokenNotValidException( + f"Invalid credentials or unauthorized token; www-authenticate: {www_authenticate}" + ) + if status_code == 408 or (500 <= status_code < 600): + logger.warning( + "validate_token: Temporary HTTP error %d [auth_type=%s, ip=%s], reason=%s, retrying", + status_code, + x_auth_type, + client_ip, + resp.reason, + ) + n_tries += 1 + time.sleep(random.uniform(1, 5)) else: - # raise an error for any other case - resp.raise_for_status() - # or just exit as we have a good result - break - except requests.exceptions.HTTPError as errh: - message = f"Not able to authenticate token {token} from {self.url}, {str(errh)}" - if resp.status_code == 403: - logger.debug(message) - raise TokenNotValidException(f"Token {token} not valid") - if resp.status_code == 408 or (resp.status_code >= 500 and resp.status_code < 600): - logger.warning(message) - raise AuthenticationUnavailableException(f"Error in authenticating token {token}") - else: - logger.error(message) - raise InternalSystemError(f"Error in authenticating token {token}, please contact the support team") + logger.error( + "validate_token: Unexpected HTTP error %d [auth_type=%s, ip=%s], reason=%s", + status_code, + x_auth_type, + client_ip, + resp.reason, + ) + raise InternalSystemError("Unexpected HTTP error during token validation") + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as err: - logger.warning(f"Not able to authenticate token {token}, {str(err)}") - raise AuthenticationUnavailableException(f"Error in authenticating token {token}") + logger.warning( + "validate_token: Connection/Timeout error on attempt %d [auth_type=%s, ip=%s]: %s", + n_tries + 1, + x_auth_type, + client_ip, + err, + ) + n_tries += 1 + time.sleep(random.uniform(1, 5)) except Exception as e: - logger.exception(e) - raise InternalSystemError(f"Error in authenticating token {token}, please contact the support team") - return resp + logger.error( + "validate_token: Unexpected error on attempt %d [auth_type=%s, ip=%s]: %s", + n_tries + 1, + x_auth_type, + client_ip, + e, + ) + raise InternalSystemError("Unexpected error during token validation") + + logger.error( + "validate_token: Exceeded maximum attempts (%d) [auth_type=%s, ip=%s]", MAX_N_TRIES, x_auth_type, client_ip + ) + raise InternalSystemError("Exceeded maximum token validation attempts") + + def validate_token(self, token, x_auth_type, client_ip="unknown"): + """ + Public wrapper that calls the memoized token validation function (if caching is enabled), + or the uncached version otherwise. + """ + return self.validate_token_cached(token, x_auth_type, client_ip=client_ip) + + def _token_to_username_impl(self, resp): + """ + Extracts user info from the auth-o-tron /authenticate response. + Expects a JWT in the "authorization" header (format: "Bearer "), + decodes the JWT without signature verification, and extracts the "username" and "realm". + Logs an INFO-level message on successful authentication. + """ + auth_header = resp.headers.get("authorization") + if not auth_header: + logger.error("auth-o-tron response missing 'authorization' header") + raise InternalSystemError("Invalid response from auth-o-tron: missing authorization header") + + parts = auth_header.split(" ", 1) + if len(parts) != 2 or parts[0].lower() != "bearer": + logger.error("Invalid authorization header format: %s", auth_header) + raise InternalSystemError("Invalid response from auth-o-tron: incorrect authorization header format") + + jwt_token = parts[1].strip() + if not jwt_token: + logger.error("JWT token is empty in authorization header") + raise InternalSystemError("Invalid response from auth-o-tron: empty JWT token") + + logger.debug("Extracted JWT token from response header: %s", jwt_token) + try: + payload = jwt.decode(jwt_token, options={"verify_signature": False}) + logger.debug("Decoded JWT payload: %s", payload) + except Exception as e: + logger.error("Failed to decode JWT token. Raw token: '%s'. Error: %s", jwt_token, e) + raise InternalSystemError("Invalid JWT returned from auth-o-tron") + + username = payload.get("username") + if not username: + logger.error("JWT payload missing 'username': %s", payload) + raise InternalSystemError("Token validation error: username missing") + + realm = payload.get("realm", "unknown") + logger.info("User '%s' successfully authenticated with realm '%s'", username, realm) + return username, realm diff --git a/aviso-server/auth/aviso_auth/config.py b/aviso-server/auth/aviso_auth/config.py index 3f8ed6f..c362b04 100644 --- a/aviso-server/auth/aviso_auth/config.py +++ b/aviso-server/auth/aviso_auth/config.py @@ -86,14 +86,14 @@ def __init__( def _create_default_config() -> Dict[str, any]: # authentication_server authentication_server = {} - authentication_server["url"] = "https://api.ecmwf.int/v1/who-am-i" + authentication_server["url"] = "http://0.0.0.0:8080" authentication_server["req_timeout"] = 60 # seconds authentication_server["cache_timeout"] = 86400 # 1 day in seconds authentication_server["monitor"] = False # authorisation_server authorisation_server = {} - authorisation_server["url"] = "https://127.0..0.1:8080" + authorisation_server["url"] = "http://127.0.0.1:8080" authorisation_server["req_timeout"] = 60 # seconds authorisation_server["cache_timeout"] = 86400 # 1 day in seconds authorisation_server["open_keys"] = ["/ec/mars", "/ec/config/aviso"] diff --git a/aviso-server/auth/aviso_auth/frontend.py b/aviso-server/auth/aviso_auth/frontend.py index 3657bec..a02d2f1 100644 --- a/aviso-server/auth/aviso_auth/frontend.py +++ b/aviso-server/auth/aviso_auth/frontend.py @@ -7,7 +7,6 @@ # nor does it submit to any jurisdiction. import json -import logging import aviso_auth.custom_exceptions as custom import gunicorn.app.base @@ -22,8 +21,6 @@ from aviso_monitoring.reporter.aviso_auth_reporter import AvisoAuthMetricType from flask import Flask, Response, render_template, request from flask_caching import Cache -from gunicorn import glogging -from six import iteritems class Frontend: @@ -31,30 +28,42 @@ def __init__(self, config: Config): self.config = config self.handler = self.create_handler() self.handler.cache = Cache(self.handler, config=config.cache) - # we need to initialise our components and timer here if this app runs in Flask, - # if instead it runs in Gunicorn the hook post_worker_init will take over, and these components will not be used + + # For direct runs (e.g. Flask "server_type"): + # We'll initialize the app-level components now. self.init_components() def init_components(self): """ - This method initialise a set of components and timers that are valid globally at application level or per worker + Initializes the Authenticator, Authoriser, BackendAdapter, + and sets up time-collectors or counters as needed. """ + # Create the authenticator (with caching if provided) self.authenticator = Authenticator(self.config, self.handler.cache) self.authoriser = Authoriser(self.config, self.handler.cache) self.backend = BackendAdapter(self.config) - # this is a time collector for the whole request + + # A time collector for measuring entire request durations (via timed_process_request()). self.timer = TimeCollector(self.config.monitoring, tlm_type=AvisoAuthMetricType.auth_resp_time.name) + + # A UniqueCountCollector for counting user accesses. This is used in process_request(). self.user_counter = UniqueCountCollector( self.config.monitoring, tlm_type=AvisoAuthMetricType.auth_users_counter.name ) + logger.debug("All components initialized: Authenticator, Authoriser, BackendAdapter, timers, counters") + def create_handler(self) -> Flask: handler = Flask(__name__) handler.title = "aviso-auth" - # We need to bind the logger of aviso to the one of app + + # Bind aviso_auth logger to the Flask app logger. logger.handlers = handler.logger.handlers def json_response(m, code, header=None): + """ + Utility for building JSON response. + """ h = {"Content-Type": "application/json"} if header: h.update(header) @@ -67,8 +76,30 @@ def invalid_input(e): @handler.errorhandler(custom.TokenNotValidException) def token_not_valid(e): + """ + Return a 401 and attach a dynamic WWW-Authenticate header if present in the exception message. + If not, fall back to the default header from the Authenticator. + """ logger.debug(f"Authentication failed: {e}") - return json_response(e, 401, self.authenticator.UNAUTHORISED_RESPONSE_HEADER) + + # Try to extract a dynamic www-authenticate header from the exception message. + # We assume the exception message is formatted like: + # "Invalid credentials or unauthorized token; www-authenticate:
" + msg = str(e) + header = {} + if "www-authenticate:" in msg.lower(): + try: + # Split on "www-authenticate:" and take the remainder. + dynamic_value = msg.split("www-authenticate:")[1].strip() + header["WWW-Authenticate"] = dynamic_value + logger.debug("Using dynamic WWW-Authenticate header: %s", dynamic_value) + except Exception as parse_err: + logger.error("Failed to parse dynamic WWW-Authenticate header: %s", parse_err) + header = self.authenticator.UNAUTHORISED_RESPONSE_HEADER + else: + header = self.authenticator.UNAUTHORISED_RESPONSE_HEADER + + return json_response(e, 401, header) @handler.errorhandler(custom.ForbiddenDestinationException) def forbidden_destination(e): @@ -106,52 +137,66 @@ def default_error_handler(e): @handler.route("/", methods=["GET"]) def index(): + """ + Simple index route that renders an index.html template + (if shipping a front-end). + Otherwise, can return a basic message. + """ return render_template("index.html") @handler.route(self.config.backend["route"], methods=["POST"]) def root(): - logger.info(f"New request received from {request.headers.get('X-Forwarded-For')}, content: {request.data}") - + """ + The main route for your proxying or backend forwarding logic. + """ + logger.info( + f"New request received from {request.headers.get('X-Forwarded-For')}, " f"content: {request.data}" + ) resp_content = timed_process_request() - - # forward back the response return Response(resp_content) def process_request(): - # authenticate request and count the users + """ + The main request processing flow: + 1. Authenticate + 2. Authorise + 3. Forward to backend + """ + # (1) Authenticate request and increment user counter username = self.user_counter(self.authenticator.authenticate, args=request) logger.debug("Request successfully authenticated") - # authorise request + # (2) Authorise request valid = self.authoriser.is_authorised(username, request) if not valid: - raise custom.ForbiddenDestinationException("User not allowed to access to the resource") + raise custom.ForbiddenDestinationException("User not allowed to access the resource") logger.debug("Request successfully authorised") - # forward request to backend + # (3) Forward request to backend resp_content = self.backend.forward(request) logger.info("Request completed") - return resp_content def timed_process_request(): """ - This method allows time the process_request function + Wraps process_request in a time collector (self.timer). """ return self.timer(process_request) return handler def run_server(self): + """ + Launches the server using either Flask's built-in server or Gunicorn. + """ logger.info( - f"Running aviso-auth - version {__version__} on server {self.config.frontend['server_type']}, \ - aviso_monitoring module v.{monitoring_version}" + f"Running aviso-auth - version {__version__} on server {self.config.frontend['server_type']}, " + f"aviso_monitoring module v.{monitoring_version}" ) logger.info(f"Configuration loaded: {self.config}") if self.config.frontend["server_type"] == "flask": - # flask internal server for non-production environments - # should only be used for testing and debugging + # Not recommended for production, but good for dev/test self.handler.run( debug=self.config.debug, host=self.config.frontend["host"], @@ -171,58 +216,41 @@ def run_server(self): def post_worker_init(self, worker): """ - This method is called just after a worker has initialized the application. - It is a Gunicorn server hook. Gunicorn spawns this app over multiple workers as processes. - This method ensures that there is only one set of components and timer running per worker. Without this hook - the components and timers are created at application level but not at worker level and then at every request a - timers will be created detached from the main transmitter threads. - This would result in no telemetry collected. + Called just after a worker initializes the application in Gunicorn. + Re-initializes any components that need a separate instance per worker process. """ logger.debug("Initialising components per worker") self.init_components() -def main(): - # initialising the user configuration configuration - config = Config() - - # create the frontend class and run it - frontend = Frontend(config) - frontend.run_server() - - class GunicornServer(gunicorn.app.base.BaseApplication): + """ + Gunicorn server wrapper. + """ + def __init__(self, app, options=None): self.options = options or {} self.application = app - super(GunicornServer, self).__init__() + super().__init__() def load_config(self): + from six import iteritems + config = dict( [(key, value) for key, value in iteritems(self.options) if key in self.cfg.settings and value is not None] ) for key, value in iteritems(config): self.cfg.set(key.lower(), value) - # this approach does not support custom filters, therefore it's better to disable it - # self.cfg.set('logger_class', GunicornServer.CustomLogger) - def load(self): return self.application - class CustomLogger(glogging.Logger): - """Custom logger for Gunicorn log messages.""" - - def setup(self, cfg): - """Configure Gunicorn application logging configuration.""" - super().setup(cfg) - - formatter = logging.getLogger().handlers[0].formatter - # Override Gunicorn's `error_log` configuration. - self._set_handler(self.error_log, cfg.errorlog, formatter) +def main(): + config = Config() + frontend = Frontend(config) + frontend.run_server() -# when running directly from this file if __name__ == "__main__": main() diff --git a/aviso-server/auth/requirements.txt b/aviso-server/auth/requirements.txt index b845813..9d87343 100644 --- a/aviso-server/auth/requirements.txt +++ b/aviso-server/auth/requirements.txt @@ -1,8 +1,9 @@ PyYAML>=5.1.2 python-json-logger>=0.1.11 requests>=2.23.0 -gunicorn>=20.0.4 +gunicorn>=23.0.0 flask>=1.1.2 Flask-Caching>=1.8.0 six>=1.15.0 rfc5424-logging-handler>=1.4.3 +PyJWT>=2.10.1 \ No newline at end of file diff --git a/pyaviso/authentication/__init__.py b/pyaviso/authentication/__init__.py index 17f75ab..172ed8e 100644 --- a/pyaviso/authentication/__init__.py +++ b/pyaviso/authentication/__init__.py @@ -16,6 +16,8 @@ class AuthType(Enum): """ ECMWF = ("ecmwf_auth", "EcmwfAuth") + OPENID = ("openid_auth", "OpenidAuth") + PLAIN = ("plain_auth", "PlainAuth") ETCD = ("etcd_auth", "EtcdAuth") NONE = ("none_auth", "NoneAuth") diff --git a/pyaviso/authentication/ecmwf_auth.py b/pyaviso/authentication/ecmwf_auth.py index 650073e..c1616e7 100644 --- a/pyaviso/authentication/ecmwf_auth.py +++ b/pyaviso/authentication/ecmwf_auth.py @@ -21,5 +21,4 @@ def __init__(self, config: UserConfig): self._username = config.username def header(self): - header = {"Authorization": f"EmailKey {self.username}:{self.password}"} - return header + return {"Authorization": f"Bearer {self._password}", "X-Auth-Type": "ecmwf"} diff --git a/pyaviso/authentication/openid_auth.py b/pyaviso/authentication/openid_auth.py new file mode 100644 index 0000000..8074461 --- /dev/null +++ b/pyaviso/authentication/openid_auth.py @@ -0,0 +1,22 @@ +# (C) Copyright 1996- ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + + +class OpenidAuth: + """ + OpenidAuth implements an OpenID authentication flow. + + It returns a Bearer header (using the shared secret from config.password) and adds + an extra header "X-Auth-Type" with the value "openid". + """ + + def __init__(self, config): + self.config = config + + def header(self): + return {"Authorization": f"Bearer {self.config.password}", "X-Auth-Type": "openid"} diff --git a/pyaviso/authentication/plain_auth.py b/pyaviso/authentication/plain_auth.py new file mode 100644 index 0000000..be6ee75 --- /dev/null +++ b/pyaviso/authentication/plain_auth.py @@ -0,0 +1,23 @@ +# (C) Copyright 1996- ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import base64 + + +class PlainAuth: + """ + PlainAuth implements Basic authentication. + """ + + def __init__(self, config): + self.config = config + + def header(self): + credentials = f"{self.config.username}:{self.config.password}" + encoded = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") + return {"Authorization": f"Basic {encoded}", "X-Auth-Type": "plain"} diff --git a/pyaviso/cli_aviso.py b/pyaviso/cli_aviso.py index 825d641..c176457 100644 --- a/pyaviso/cli_aviso.py +++ b/pyaviso/cli_aviso.py @@ -352,7 +352,7 @@ def notify(parameters: str, configuration: conf.UserConfig): cli.add_command(notify) if __name__ == "__main__": - listen() + cli() def _parse_inline_params(params: str) -> Dict[str, any]: diff --git a/pyaviso/engine/engine.py b/pyaviso/engine/engine.py index e6ab495..88fda4f 100644 --- a/pyaviso/engine/engine.py +++ b/pyaviso/engine/engine.py @@ -209,7 +209,7 @@ def push_with_status( """ # create the status payload status = { - "etcd_user": self.auth.username, + "etcd_user": getattr(self.auth, "username", None), "message": message, "unix_user": getpass.getuser(), "aviso_version": __version__, diff --git a/pyaviso/version.py b/pyaviso/version.py index 923280c..c4359c3 100644 --- a/pyaviso/version.py +++ b/pyaviso/version.py @@ -1,2 +1,2 @@ # version number for the application -__version__ = "1.0.0" +__version__ = "1.1.0"