Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions database/logging_system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from .logger import (
PipelineLogger,
get_access_logger,
get_api_logger,
get_error_logger,
get_logger,
)
from .pipeline_logger import PipelineStageLogger
from .script_logger import ScriptLogger

__all__ = [
"PipelineLogger",
"PipelineStageLogger",
"ScriptLogger",
"get_logger",
"get_api_logger",
"get_access_logger",
"get_error_logger",
]

try:
from .exception_handler import global_exception_handler
from .request_middleware import RequestLoggingMiddleware
except ImportError: # FastAPI/Starlette not installed yet.
global_exception_handler = None
RequestLoggingMiddleware = None
else:
__all__.extend(
[
"RequestLoggingMiddleware",
"global_exception_handler",
]
)
41 changes: 41 additions & 0 deletions database/logging_system/exception_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Global exception handler helpers for FastAPI apps.
"""

from __future__ import annotations

from .logger import get_error_logger

try:
from fastapi import Request
from fastapi.responses import JSONResponse
except ImportError as exc: # pragma: no cover
raise ImportError(
"exception_handler.py requires FastAPI to be installed."
) from exc


error_logger = get_error_logger("exceptions")


async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""
Log unexpected exceptions and return a safe response to the client.
"""
request_id = getattr(request.state, "request_id", "unknown")

error_logger.exception(
"event=unhandled_exception request_id=%s method=%s path=%s error=%s",
request_id,
request.method,
request.url.path,
str(exc),
)

return JSONResponse(
status_code=500,
content={
"message": "Internal server error",
"request_id": request_id,
},
)
34 changes: 34 additions & 0 deletions database/logging_system/log_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Shared logging configuration for API, access, error, and pipeline logs.
"""

from __future__ import annotations

import os
from datetime import datetime

LOG_DIR = os.path.join("database", "logs")
os.makedirs(LOG_DIR, exist_ok=True)

DEFAULT_LOG_LEVEL = "INFO"
DEFAULT_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_LOG_FORMAT = (
"%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)

TODAY = datetime.now().strftime("%Y_%m_%d")

LOG_FILE_NAMES = {
"pipeline": f"pipeline_{TODAY}.log",
"api": f"api_{TODAY}.log",
"access": f"access_{TODAY}.log",
"error": f"error_{TODAY}.log",
}


def get_log_file_path(channel: str) -> str:
"""
Return the full path for a given logging channel.
"""
filename = LOG_FILE_NAMES.get(channel, f"{channel}_{TODAY}.log")
return os.path.join(LOG_DIR, filename)
88 changes: 58 additions & 30 deletions database/logging_system/logger.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,82 @@
"""
Central logging configuration for the Food Remedy database pipeline.
Provides consistent formatting + levels (INFO, WARNING, ERROR).
Central logging configuration for Food Remedy.
Supports pipeline, API, access, and error logging channels.
"""

from __future__ import annotations

import logging
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler

from .log_config import (
DEFAULT_DATE_FORMAT,
DEFAULT_LOG_FORMAT,
DEFAULT_LOG_LEVEL,
get_log_file_path,
)


def _build_formatter() -> logging.Formatter:
return logging.Formatter(DEFAULT_LOG_FORMAT, DEFAULT_DATE_FORMAT)


# Create logs folder if missing
LOG_DIR = "database/logs"
os.makedirs(LOG_DIR, exist_ok=True)
def _build_file_handler(channel: str) -> RotatingFileHandler:
handler = RotatingFileHandler(
get_log_file_path(channel),
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
handler.setFormatter(_build_formatter())
return handler

# Log file name with date
LOG_FILE = os.path.join(LOG_DIR, f"pipeline_{datetime.now().strftime('%Y_%m_%d')}.log")

def get_logger(name: str):
def _build_stream_handler() -> logging.StreamHandler:
handler = logging.StreamHandler()
handler.setFormatter(_build_formatter())
return handler


def get_logger(name: str, channel: str = "pipeline") -> logging.Logger:
"""
Returns a logger with consistent formatting across the pipeline.
Return a logger configured for the given logging channel.
"""
logger = logging.getLogger(name)
logger_name = f"{channel}.{name}"
logger = logging.getLogger(logger_name)

if not logger.handlers:
logger.setLevel(logging.INFO)
if logger.handlers:
return logger

formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
"%Y-%m-%d %H:%M:%S"
)
logger.setLevel(getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO))
logger.propagate = False

file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(formatter)
logger.addHandler(_build_file_handler(channel))
logger.addHandler(_build_stream_handler())

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
return logger

logger.addHandler(file_handler)
logger.addHandler(stream_handler)

return logger
def get_api_logger(name: str = "api") -> logging.Logger:
return get_logger(name=name, channel="api")


def get_access_logger(name: str = "access") -> logging.Logger:
return get_logger(name=name, channel="access")


def get_error_logger(name: str = "error") -> logging.Logger:
return get_logger(name=name, channel="error")


# Optional: PipelineLogger class wrapper
class PipelineLogger:
def __init__(self, name: str):
self.logger = get_logger(name)
self.logger = get_logger(name=name, channel="pipeline")

def info(self, msg: str):
self.logger.info(msg)

def warning(self, msg: str):
self.logger.warning(msg)

def error(self, msg: str):
self.logger.error(msg)
102 changes: 102 additions & 0 deletions database/logging_system/pipeline_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Helpers for logging pipeline stages and run summaries.
"""

from __future__ import annotations

from typing import Any

from .logger import get_error_logger, get_logger


class PipelineStageLogger:
"""
Convenience wrapper for logging pipeline stage lifecycle events.
"""

def __init__(self, pipeline_name: str = "DataPipeline"):
self.pipeline_name = pipeline_name
self.logger = get_logger(pipeline_name, channel="pipeline")
self.error_logger = get_error_logger(pipeline_name)

def log_pipeline_start(self, input_source: str | None = None) -> None:
message = f"event=pipeline_start pipeline={self.pipeline_name}"
if input_source:
message += f" input_source={input_source}"
self.logger.info(message)

def log_pipeline_end(
self,
total_duration_ms: float | None = None,
total_records: int | None = None,
) -> None:
message = f"event=pipeline_end pipeline={self.pipeline_name}"
if total_duration_ms is not None:
message += f" total_duration_ms={total_duration_ms}"
if total_records is not None:
message += f" total_records={total_records}"
self.logger.info(message)

def log_stage_start(
self,
stage_name: str,
input_file: str | None = None,
batch_id: str | None = None,
) -> None:
message = f"event=stage_start pipeline={self.pipeline_name} stage={stage_name}"
if input_file:
message += f" input_file={input_file}"
if batch_id:
message += f" batch_id={batch_id}"
self.logger.info(message)

def log_stage_end(
self,
stage_name: str,
duration_ms: float | None = None,
input_records: int | None = None,
output_records: int | None = None,
output_file: str | None = None,
) -> None:
message = f"event=stage_end pipeline={self.pipeline_name} stage={stage_name}"
if duration_ms is not None:
message += f" duration_ms={duration_ms}"
if input_records is not None:
message += f" input_records={input_records}"
if output_records is not None:
message += f" output_records={output_records}"
if output_file:
message += f" output_file={output_file}"
self.logger.info(message)

def log_stage_warning(self, stage_name: str, warning_message: str) -> None:
self.logger.warning(
"event=stage_warning pipeline=%s stage=%s warning=%s",
self.pipeline_name,
stage_name,
warning_message,
)

def log_stage_error(
self,
stage_name: str,
error: Exception | str,
input_file: str | None = None,
batch_id: str | None = None,
) -> None:
details = f"event=stage_error pipeline={self.pipeline_name} stage={stage_name}"
if input_file:
details += f" input_file={input_file}"
if batch_id:
details += f" batch_id={batch_id}"

self.error_logger.exception("%s error=%s", details, error)

def log_metric(self, stage_name: str, metric_name: str, metric_value: Any) -> None:
self.logger.info(
"event=metric pipeline=%s stage=%s metric=%s value=%s",
self.pipeline_name,
stage_name,
metric_name,
metric_value,
)
53 changes: 0 additions & 53 deletions database/logging_system/pipeline_logger_demo.py

This file was deleted.

Loading