Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion polytope_server/common/authotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import requests
from jose import jwt

from .caching import cache
from .exceptions import UnauthorizedRequest
from .user import User

Expand All @@ -37,11 +38,12 @@ def __init__(self, config: dict):
if not self.secret:
raise ValueError("Missing secret key")

@cache(lifetime=60)
def authenticate(self, auth_header: str) -> User:
"""Forwards the header to Auth-o-tron.
Returns authenticated User, or raises UnauthorizedRequest"""

logging.info("Authenticating user with header: {}".format(auth_header))
logging.debug("Authenticating user with header: {}".format(auth_header))

if auth_header.startswith("EmailKey "):
logging.debug("Converting EmailKey to Bearer token")
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def create_collections(config) -> Dict[str, Collection]:
collections = {}
for k, v in config.items():
collections[k] = Collection(k, v)
logging.info(
logging.debug(
"Configured collections: {}".format(list(collections.keys())),
extra={"collections": [col._serialize() for col in collections.values()]},
)
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def create_datasource(config: dict) -> DataSource:
constructor = getattr(module, datasource_class)
datasource = constructor(config)

logging.info("Datasource {} initialized [{}].".format(config["name"], datasource_class))
logging.debug("Datasource {} initialized [{}].".format(config["name"], datasource_class))

return datasource

Expand Down
13 changes: 8 additions & 5 deletions polytope_server/common/datasource/mars.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import requests
import yaml

from ...telemetry.helpers import obfuscate_apikey
from ..io.fifo import FIFO
from ..subprocess import Subprocess
from . import datasource
Expand Down Expand Up @@ -102,7 +103,7 @@ def retrieve(self, request):
# Make a temporary file for the request
with tempfile.NamedTemporaryFile(delete=False) as tmp:
self.request_file = tmp.name
logging.info("Writing request to tempfile {}".format(self.request_file))
logging.debug("Writing request to tempfile {}".format(self.request_file))
tmp.write(convert_to_mars_request(r, "retrieve").encode())

# Call MARS
Expand All @@ -113,7 +114,7 @@ def retrieve(self, request):
env=self.make_env(request),
)

logging.info("MARS subprocess started with PID {}".format(self.subprocess.subprocess.pid))
logging.debug("MARS subprocess started with PID {}".format(self.subprocess.subprocess.pid))

if self.use_file_io:
while self.subprocess.running():
Expand All @@ -126,7 +127,7 @@ def retrieve(self, request):
while self.subprocess.running():
# logging.debug("Checking if MARS process has opened FIFO.") # this floods the logs
if self.fifo.ready():
logging.info("FIFO is ready for reading.")
logging.debug("FIFO is ready for reading.")
break

self.subprocess.read_output(request, self.mars_error_filter)
Expand Down Expand Up @@ -301,7 +302,7 @@ def make_env(self, request):
mars_user = request.user.attributes.get("ecmwf-email", "no-email")

if self.override_mars_apikey:
logging.info("Overriding MARS_USER_TOKEN with {}".format(self.override_mars_apikey))
logging.info("Overriding MARS_USER_TOKEN with {}".format(obfuscate_apikey(self.override_mars_apikey)))
mars_token = self.override_mars_apikey
else:
mars_token = request.user.attributes.get("ecmwf-apikey", "no-api-key")
Expand All @@ -320,7 +321,9 @@ def make_env(self, request):
if self.protocol == "dhs":
env.update(self._build_dhs_env())

logging.info("Accessing MARS on behalf of user {} with token {}".format(mars_user, mars_token))
logging.debug(
"Accessing MARS on behalf of user {} with token {}".format(mars_user, obfuscate_apikey(mars_token))
)

except Exception as e:
logging.error("MARS request aborted because user does not have associated ECMWF credentials")
Expand Down
18 changes: 1 addition & 17 deletions polytope_server/common/datasource/polytope.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,6 @@ def get_type(self):
def archive(self, request):
raise NotImplementedError()

def check_extra_roles(self, request: PolytopeRequest) -> bool:
# if the user has any of the extra roles, they are allowed
realm = request.user.realm
req_extra_roles = self.extra_required_role.get(realm, [])

if len(req_extra_roles) == 0:
return True

logging.info(f"Checking for user roles in required extra roles: {req_extra_roles}")
logging.info(f"User roles: {request.user.roles}")

if any(role in req_extra_roles for role in request.user.roles):
return True
else:
return False

def retrieve(self, request):
r = copy.deepcopy(request.coerced_request)

Expand Down Expand Up @@ -132,7 +116,7 @@ def retrieve(self, request):
return True

def result(self, request):
logging.info("Getting result")
logging.debug("Getting result")
yield self.output

def destroy(self, request) -> None:
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/io/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, name, dir=None):

os.mkfifo(self.path, 0o600)
self.fifo = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
logging.info("FIFO created")
logging.debug("FIFO created")

def ready(self):
"""Wait until FIFO is ready for reading -- i.e. opened by the writing process (man select)"""
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def setup(config, source_name):
logger.addHandler(handler)
logger.setLevel(level)

logger.info("Logging Initialized")
logger.debug("Logging Initialized")


def optional_json_dumps(mode="json"):
Expand Down
10 changes: 4 additions & 6 deletions polytope_server/common/metric_calculator/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def __init__(self, collection: Collection, metric_collection: Optional[Collectio
self.collection = collection
self.metric_collection = metric_collection
self.histogram_builder = HistogramBuilder()
logger.info("Initialized MongoMetricCalculator for collection %s", collection.name)
logger.debug("Initialized MongoMetricCalculator for collection %s", collection.name)
if metric_collection is not None:
logger.info(" with metric_collection %s", metric_collection.name)
logger.debug(" with metric_collection %s", metric_collection.name)

def ensure_indexes(self) -> None:
"""Ensure all indexes needed for metric queries exist."""
Expand Down Expand Up @@ -185,7 +185,7 @@ def ensure_indexes(self) -> None:
name="ix_status_history_processing",
)

logger.info("Metric aggregation indexes ensured successfully")
logger.debug("Metric aggregation indexes ensured successfully")

def ensure_metric_indexes(self) -> None:
"""
Expand All @@ -197,8 +197,6 @@ def ensure_metric_indexes(self) -> None:
logger.warning("No metric_collection provided, skipping metric indexes")
return

logger.info("Ensuring metric store indexes for collection: %s", self.metric_collection.name)

# Index for type + status + timestamp queries
safe_create_index(
self.metric_collection,
Expand All @@ -224,7 +222,7 @@ def ensure_metric_indexes(self) -> None:
},
)

logger.info("Metric store indexes ensured successfully")
logger.debug("Metric store indexes ensured successfully")

def get_usage_metrics_aggregated(self, cutoff_timestamps: Dict[str, float]) -> Dict[str, Any]:
"""
Expand Down
2 changes: 1 addition & 1 deletion polytope_server/common/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def set_status(self, value: Status) -> None:
if self.status_history is None:
self.status_history = {}
self.status_history.setdefault(value.value, now_ts)
logging.info("Request %s status set to %s.", self.id, value.value)
logging.debug("Request %s status set to %s.", self.id, value.value)

@classmethod
def serialize_slot(cls, key, value):
Expand Down
4 changes: 2 additions & 2 deletions polytope_server/common/staging/s3_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(self, config):
try:
self.s3_client.create_bucket(Bucket=self.bucket)
except self.s3_client.exceptions.BucketAlreadyExists:
logging.info(f"Bucket {self.bucket} already exists.")
logging.debug(f"Bucket {self.bucket} already exists.")
except self.s3_client.exceptions.BucketAlreadyOwnedByYou:
logging.info(f"Bucket {self.bucket} already exists and owned by you.")
except ClientError as e:
Expand All @@ -110,7 +110,7 @@ def __init__(self, config):
if self.should_set_policy:
self.set_bucket_policy()

logging.info(f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}")
logging.debug(f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}")

def create(self, name, data, content_type):

Expand Down
72 changes: 53 additions & 19 deletions polytope_server/common/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,27 @@
import subprocess
from subprocess import CalledProcessError

from ..telemetry.helpers import obfuscate_apikey


class Subprocess:
def __init__(self):
self.subprocess = None
self._stdout_buffer = []
self._stderr_buffer = []

def run(self, cmd, cwd=None, env=None):
env = {**os.environ, **(env or None)}
logging.info("Calling {} in directory {} with env {}".format(cmd, cwd, env))
env = {**os.environ, **(env or {})}
obfuscated_env = {}
for k, v in env.items():
if "KEY" in k or "TOKEN" in k:
if v is None:
obfuscated_env[k] = None
else:
obfuscated_env[k] = obfuscate_apikey(str(v))
else:
obfuscated_env[k] = v
logging.info("Calling {} in directory {}".format(cmd, cwd), extra={"env": obfuscated_env})
self.subprocess = subprocess.Popen(
cmd,
env=env,
Expand All @@ -50,15 +63,11 @@ def read_output(self, request, err_filter=None):
line = fd.readline()
if line:
line = line.decode().strip()
if fd == self.subprocess.stdout:
logging.info(line)
elif fd == self.subprocess.stderr:
logging.error(line)
if err_filter and err_filter in line:
request.user_message += line + "\n"
self._handle_line(fd, line, request, err_filter)
if not self.running():
break
ret = select.select(reads, [], [], 0)
self._flush_buffers(request, err_filter)

def running(self):
return self.subprocess.poll() is None
Expand All @@ -68,7 +77,6 @@ def returncode(self):

def finalize(self, request, err_filter):
"""Close subprocess and decode output"""
logging.info("Finalizing subprocess")
# fifo has been closed so this process should finish, but sometimes hangs so we set a timeout
try:
returncode = self.subprocess.wait(60)
Expand All @@ -77,20 +85,46 @@ def finalize(self, request, err_filter):
self.subprocess.kill()
returncode = self.subprocess.returncode
logging.info("Subprocess finished with return code: {}".format(returncode))
logging.info("Subprocess stdout:")
for line in self.subprocess.stdout:
line = line.decode().strip()
if err_filter and err_filter in line:
request.user_message += line + "\n"
logging.error(line)
else:
logging.info(line)
logging.info("Subprocess stderr:")
self._handle_line(self.subprocess.stdout, line, request, err_filter)
for line in self.subprocess.stderr:
line = line.decode().strip()
if err_filter and err_filter in line:
request.user_message += line + "\n"
logging.error(line)
self._handle_line(self.subprocess.stderr, line, request, err_filter)

self._flush_buffers(request, err_filter)

if returncode != 0:
raise CalledProcessError(returncode, self.subprocess.args)

def _handle_line(self, fd, line, request, err_filter):
buffer, log_func = self._get_buffer_and_logger(fd)
buffer.append(line)

def _flush_buffers(self, request, err_filter):
for buffer, log_func in [
(self._stdout_buffer, logging.info),
(self._stderr_buffer, logging.error),
]:
self._flush_buffer(buffer, log_func, request, err_filter)

def _flush_buffer(self, buffer, log_func, request, err_filter):
if not buffer:
return
message = "\n".join(buffer)
if err_filter:
matching_lines = [line for line in buffer if err_filter in line]
has_error = bool(matching_lines)
else:
matching_lines = []
has_error = False
log_method = logging.error if has_error else log_func
log_method(message)
if matching_lines:
request.user_message += "\n".join(matching_lines) + "\n"
buffer.clear()

def _get_buffer_and_logger(self, fd):
if fd == self.subprocess.stdout:
return self._stdout_buffer, logging.info
return self._stderr_buffer, logging.error
2 changes: 1 addition & 1 deletion polytope_server/common/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def has_access(self, roles: list | set | dict | str) -> bool:

for required_role in roles:
if required_role in self.roles:
logging.info(f"User {self.username} is authorized with role {required_role}")
logging.debug(f"User {self.username} is authorized with role {required_role}")
return True

return False
2 changes: 1 addition & 1 deletion polytope_server/frontend/common/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def query_request(self, user: User, id: str) -> Response:
return RequestSucceeded(response)

response = self.construct_response(request)
return RequestAccepted(response)
return RequestAccepted(response, info_log=False)

def upload(self, id: str, http_request: Request) -> Response:
"""
Expand Down
7 changes: 5 additions & 2 deletions polytope_server/frontend/common/flask_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def RequestSucceeded(response: collections.abc.Mapping | str) -> Response:
return Response(response=json.dumps(response), status=status, mimetype="application/json")


def RequestAccepted(response: collections.abc.Mapping | str) -> Response:
def RequestAccepted(response: collections.abc.Mapping | str, info_log: bool = True) -> Response:
if not isinstance(response, collections.abc.Mapping):
response = {"message": response}
if response["message"] == "":
Expand All @@ -48,7 +48,10 @@ def RequestAccepted(response: collections.abc.Mapping | str) -> Response:
headers = {"Location": response["location"], "Retry-After": 5}
response.pop("location")
status = 202
logging.info(response["message"], extra={"response": response, "http.status": status})
if info_log:
logging.info(response["message"], extra={"response": response, "http.status": status})
else:
logging.debug(response["message"], extra={"response": response, "http.status": status})
return Response(
response=json.dumps(response),
status=status,
Expand Down
Loading