diff --git a/database/logging_system/__init__.py b/database/logging_system/__init__.py index e69de29..58f4c4d 100644 --- a/database/logging_system/__init__.py +++ b/database/logging_system/__init__.py @@ -0,0 +1,33 @@ +from .logger import ( + PipelineLogger, + get_access_logger, + get_api_logger, + get_error_logger, + get_logger, +) +from .pipeline_logger import PipelineStageLogger +from .script_logger import ScriptLogger + +__all__ = [ + "PipelineLogger", + "PipelineStageLogger", + "ScriptLogger", + "get_logger", + "get_api_logger", + "get_access_logger", + "get_error_logger", +] + +try: + from .exception_handler import global_exception_handler + from .request_middleware import RequestLoggingMiddleware +except ImportError: # FastAPI/Starlette not installed yet. + global_exception_handler = None + RequestLoggingMiddleware = None +else: + __all__.extend( + [ + "RequestLoggingMiddleware", + "global_exception_handler", + ] + ) diff --git a/database/logging_system/exception_handler.py b/database/logging_system/exception_handler.py new file mode 100644 index 0000000..e12359c --- /dev/null +++ b/database/logging_system/exception_handler.py @@ -0,0 +1,41 @@ +""" +Global exception handler helpers for FastAPI apps. +""" + +from __future__ import annotations + +from .logger import get_error_logger + +try: + from fastapi import Request + from fastapi.responses import JSONResponse +except ImportError as exc: # pragma: no cover + raise ImportError( + "exception_handler.py requires FastAPI to be installed." + ) from exc + + +error_logger = get_error_logger("exceptions") + + +async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse: + """ + Log unexpected exceptions and return a safe response to the client. + """ + request_id = getattr(request.state, "request_id", "unknown") + + error_logger.exception( + "event=unhandled_exception request_id=%s method=%s path=%s error=%s", + request_id, + request.method, + request.url.path, + str(exc), + ) + + return JSONResponse( + status_code=500, + content={ + "message": "Internal server error", + "request_id": request_id, + }, + ) diff --git a/database/logging_system/log_config.py b/database/logging_system/log_config.py new file mode 100644 index 0000000..919ed76 --- /dev/null +++ b/database/logging_system/log_config.py @@ -0,0 +1,34 @@ +""" +Shared logging configuration for API, access, error, and pipeline logs. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +LOG_DIR = os.path.join("database", "logs") +os.makedirs(LOG_DIR, exist_ok=True) + +DEFAULT_LOG_LEVEL = "INFO" +DEFAULT_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" +DEFAULT_LOG_FORMAT = ( + "%(asctime)s | %(levelname)s | %(name)s | %(message)s" +) + +TODAY = datetime.now().strftime("%Y_%m_%d") + +LOG_FILE_NAMES = { + "pipeline": f"pipeline_{TODAY}.log", + "api": f"api_{TODAY}.log", + "access": f"access_{TODAY}.log", + "error": f"error_{TODAY}.log", +} + + +def get_log_file_path(channel: str) -> str: + """ + Return the full path for a given logging channel. + """ + filename = LOG_FILE_NAMES.get(channel, f"{channel}_{TODAY}.log") + return os.path.join(LOG_DIR, filename) diff --git a/database/logging_system/logger.py b/database/logging_system/logger.py index 6483529..5d25c93 100644 --- a/database/logging_system/logger.py +++ b/database/logging_system/logger.py @@ -1,54 +1,82 @@ """ -Central logging configuration for the Food Remedy database pipeline. -Provides consistent formatting + levels (INFO, WARNING, ERROR). +Central logging configuration for Food Remedy. +Supports pipeline, API, access, and error logging channels. """ +from __future__ import annotations + import logging -import os -from datetime import datetime +from logging.handlers import RotatingFileHandler + +from .log_config import ( + DEFAULT_DATE_FORMAT, + DEFAULT_LOG_FORMAT, + DEFAULT_LOG_LEVEL, + get_log_file_path, +) + + +def _build_formatter() -> logging.Formatter: + return logging.Formatter(DEFAULT_LOG_FORMAT, DEFAULT_DATE_FORMAT) + -# Create logs folder if missing -LOG_DIR = "database/logs" -os.makedirs(LOG_DIR, exist_ok=True) +def _build_file_handler(channel: str) -> RotatingFileHandler: + handler = RotatingFileHandler( + get_log_file_path(channel), + maxBytes=5 * 1024 * 1024, + backupCount=5, + encoding="utf-8", + ) + handler.setFormatter(_build_formatter()) + return handler -# Log file name with date -LOG_FILE = os.path.join(LOG_DIR, f"pipeline_{datetime.now().strftime('%Y_%m_%d')}.log") -def get_logger(name: str): +def _build_stream_handler() -> logging.StreamHandler: + handler = logging.StreamHandler() + handler.setFormatter(_build_formatter()) + return handler + + +def get_logger(name: str, channel: str = "pipeline") -> logging.Logger: """ - Returns a logger with consistent formatting across the pipeline. + Return a logger configured for the given logging channel. """ - logger = logging.getLogger(name) + logger_name = f"{channel}.{name}" + logger = logging.getLogger(logger_name) - if not logger.handlers: - logger.setLevel(logging.INFO) + if logger.handlers: + return logger - formatter = logging.Formatter( - "%(asctime)s | %(levelname)s | %(name)s | %(message)s", - "%Y-%m-%d %H:%M:%S" - ) + logger.setLevel(getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO)) + logger.propagate = False - file_handler = logging.FileHandler(LOG_FILE) - file_handler.setFormatter(formatter) + logger.addHandler(_build_file_handler(channel)) + logger.addHandler(_build_stream_handler()) - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(formatter) + return logger - logger.addHandler(file_handler) - logger.addHandler(stream_handler) - return logger +def get_api_logger(name: str = "api") -> logging.Logger: + return get_logger(name=name, channel="api") + + +def get_access_logger(name: str = "access") -> logging.Logger: + return get_logger(name=name, channel="access") + + +def get_error_logger(name: str = "error") -> logging.Logger: + return get_logger(name=name, channel="error") + -# Optional: PipelineLogger class wrapper class PipelineLogger: def __init__(self, name: str): - self.logger = get_logger(name) - + self.logger = get_logger(name=name, channel="pipeline") + def info(self, msg: str): self.logger.info(msg) - + def warning(self, msg: str): self.logger.warning(msg) - + def error(self, msg: str): self.logger.error(msg) diff --git a/database/logging_system/pipeline_logger.py b/database/logging_system/pipeline_logger.py new file mode 100644 index 0000000..0924eca --- /dev/null +++ b/database/logging_system/pipeline_logger.py @@ -0,0 +1,102 @@ +""" +Helpers for logging pipeline stages and run summaries. +""" + +from __future__ import annotations + +from typing import Any + +from .logger import get_error_logger, get_logger + + +class PipelineStageLogger: + """ + Convenience wrapper for logging pipeline stage lifecycle events. + """ + + def __init__(self, pipeline_name: str = "DataPipeline"): + self.pipeline_name = pipeline_name + self.logger = get_logger(pipeline_name, channel="pipeline") + self.error_logger = get_error_logger(pipeline_name) + + def log_pipeline_start(self, input_source: str | None = None) -> None: + message = f"event=pipeline_start pipeline={self.pipeline_name}" + if input_source: + message += f" input_source={input_source}" + self.logger.info(message) + + def log_pipeline_end( + self, + total_duration_ms: float | None = None, + total_records: int | None = None, + ) -> None: + message = f"event=pipeline_end pipeline={self.pipeline_name}" + if total_duration_ms is not None: + message += f" total_duration_ms={total_duration_ms}" + if total_records is not None: + message += f" total_records={total_records}" + self.logger.info(message) + + def log_stage_start( + self, + stage_name: str, + input_file: str | None = None, + batch_id: str | None = None, + ) -> None: + message = f"event=stage_start pipeline={self.pipeline_name} stage={stage_name}" + if input_file: + message += f" input_file={input_file}" + if batch_id: + message += f" batch_id={batch_id}" + self.logger.info(message) + + def log_stage_end( + self, + stage_name: str, + duration_ms: float | None = None, + input_records: int | None = None, + output_records: int | None = None, + output_file: str | None = None, + ) -> None: + message = f"event=stage_end pipeline={self.pipeline_name} stage={stage_name}" + if duration_ms is not None: + message += f" duration_ms={duration_ms}" + if input_records is not None: + message += f" input_records={input_records}" + if output_records is not None: + message += f" output_records={output_records}" + if output_file: + message += f" output_file={output_file}" + self.logger.info(message) + + def log_stage_warning(self, stage_name: str, warning_message: str) -> None: + self.logger.warning( + "event=stage_warning pipeline=%s stage=%s warning=%s", + self.pipeline_name, + stage_name, + warning_message, + ) + + def log_stage_error( + self, + stage_name: str, + error: Exception | str, + input_file: str | None = None, + batch_id: str | None = None, + ) -> None: + details = f"event=stage_error pipeline={self.pipeline_name} stage={stage_name}" + if input_file: + details += f" input_file={input_file}" + if batch_id: + details += f" batch_id={batch_id}" + + self.error_logger.exception("%s error=%s", details, error) + + def log_metric(self, stage_name: str, metric_name: str, metric_value: Any) -> None: + self.logger.info( + "event=metric pipeline=%s stage=%s metric=%s value=%s", + self.pipeline_name, + stage_name, + metric_name, + metric_value, + ) diff --git a/database/logging_system/pipeline_logger_demo.py b/database/logging_system/pipeline_logger_demo.py deleted file mode 100644 index b7d043d..0000000 --- a/database/logging_system/pipeline_logger_demo.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -Demonstration of DB005 Basic Logging System. -Logs each stage of the example database pipeline. -""" - -from logger import get_logger -import time - -logger = get_logger("DatabasePipeline") - -def load_data_stage(): - logger.info("Starting data loading stage...") - time.sleep(0.3) - logger.info("Data loaded successfully.") - -def validate_data_stage(): - logger.info("Validating dataset structure...") - time.sleep(0.2) - logger.warning("Some rows contain missing fields but continue processing.") - -def clean_data_stage(): - logger.info("Cleaning data fields...") - time.sleep(0.3) - logger.info("Data cleaning completed.") - -def transform_data_stage(): - logger.info("Transforming data for database ingestion...") - time.sleep(0.25) - logger.info("Transformation completed.") - -def save_data_stage(): - logger.info("Saving cleaned dataset to database...") - time.sleep(0.2) - - try: - # simulate failure - raise ValueError("Database connection failed.") - except Exception as e: - logger.error(f"Failed to save data: {e}") - -def main(): - logger.info("=== Pipeline started ===") - - load_data_stage() - validate_data_stage() - clean_data_stage() - transform_data_stage() - save_data_stage() - - logger.info("=== Pipeline finished ===") - -if __name__ == "__main__": - main() diff --git a/database/logging_system/request_middleware.py b/database/logging_system/request_middleware.py new file mode 100644 index 0000000..0869330 --- /dev/null +++ b/database/logging_system/request_middleware.py @@ -0,0 +1,81 @@ +""" +Request/response logging middleware for FastAPI or Starlette apps. +""" + +from __future__ import annotations + +import time +import uuid +from typing import Callable + +from .logger import get_access_logger, get_api_logger, get_error_logger + +try: + from starlette.middleware.base import BaseHTTPMiddleware + from starlette.requests import Request +except ImportError as exc: # pragma: no cover + raise ImportError( + "request_middleware.py requires FastAPI/Starlette to be installed." + ) from exc + + +access_logger = get_access_logger("requests") +api_logger = get_api_logger("requests") +error_logger = get_error_logger("requests") + + +class RequestLoggingMiddleware(BaseHTTPMiddleware): + """ + Logs request start, request end, and unexpected request failures. + """ + + async def dispatch(self, request: Request, call_next: Callable): + request_id = str(uuid.uuid4()) + start_time = time.perf_counter() + + request.state.request_id = request_id + client_ip = request.client.host if request.client else "unknown" + + access_logger.info( + "event=request_start request_id=%s method=%s path=%s client_ip=%s", + request_id, + request.method, + request.url.path, + client_ip, + ) + + try: + response = await call_next(request) + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + response.headers["X-Request-ID"] = request_id + + access_logger.info( + "event=request_end request_id=%s method=%s path=%s status_code=%s duration_ms=%s", + request_id, + request.method, + request.url.path, + response.status_code, + duration_ms, + ) + + api_logger.info( + "request_id=%s method=%s path=%s status_code=%s duration_ms=%s", + request_id, + request.method, + request.url.path, + response.status_code, + duration_ms, + ) + + return response + except Exception: + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + error_logger.exception( + "event=request_failed request_id=%s method=%s path=%s duration_ms=%s", + request_id, + request.method, + request.url.path, + duration_ms, + ) + raise diff --git a/database/logging_system/script_logger.py b/database/logging_system/script_logger.py new file mode 100644 index 0000000..1d22378 --- /dev/null +++ b/database/logging_system/script_logger.py @@ -0,0 +1,55 @@ +""" +Helpers for standalone backend scripts such as scraping, cleaning, and seeding. +""" + +from __future__ import annotations + +from .logger import get_error_logger, get_logger + + +class ScriptLogger: + """ + Small helper for logging the lifecycle of standalone scripts. + """ + + def __init__(self, script_name: str, channel: str = "pipeline"): + self.script_name = script_name + self.logger = get_logger(script_name, channel=channel) + self.error_logger = get_error_logger(script_name) + + def log_start(self, input_source: str | None = None) -> None: + message = f"event=script_start script={self.script_name}" + if input_source: + message += f" input_source={input_source}" + self.logger.info(message) + + def log_progress(self, step: str, records_processed: int | None = None) -> None: + message = f"event=script_progress script={self.script_name} step={step}" + if records_processed is not None: + message += f" records_processed={records_processed}" + self.logger.info(message) + + def log_success( + self, + output_target: str | None = None, + total_records: int | None = None, + ) -> None: + message = f"event=script_success script={self.script_name}" + if output_target: + message += f" output_target={output_target}" + if total_records is not None: + message += f" total_records={total_records}" + self.logger.info(message) + + def log_warning(self, warning_message: str) -> None: + self.logger.warning( + "event=script_warning script=%s warning=%s", + self.script_name, + warning_message, + ) + + def log_error(self, error: Exception | str, step: str | None = None) -> None: + details = f"event=script_error script={self.script_name}" + if step: + details += f" step={step}" + self.error_logger.exception("%s error=%s", details, error) diff --git a/database/pipeline/stages/clean_stage.py b/database/pipeline/stages/clean_stage.py index 5b2c9c7..7745f7f 100644 --- a/database/pipeline/stages/clean_stage.py +++ b/database/pipeline/stages/clean_stage.py @@ -1,7 +1,7 @@ import os import json -def run_clean_stage(input_path: str, output_path: str): +def run_clean_stage(input_path: str, output_path: str, config=None): """ Robust clean stage that never crashes on nested OFF data. """ @@ -9,8 +9,10 @@ def run_clean_stage(input_path: str, output_path: str): with open(input_path, "r", encoding="utf-8") as f: data = json.load(f) - if not isinstance(data, list): - raise ValueError("Expected list of records") + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + raise ValueError("Expected list or dict") cleaned = [] @@ -33,6 +35,13 @@ def run_clean_stage(input_path: str, output_path: str): print(f"[DB018] Cleaning complete: {output_path}") + return { + "status": "completed", + "processed": len(cleaned), + "failures": 0, + "output": output_path + } + diff --git a/database/pipeline/stages/enrich_stage.py b/database/pipeline/stages/enrich_stage.py index f7bd2c9..c4d4850 100644 --- a/database/pipeline/stages/enrich_stage.py +++ b/database/pipeline/stages/enrich_stage.py @@ -13,7 +13,7 @@ def import_module_from_path(path: str) -> types.ModuleType: return module -def run_enrich_stage(input_path: str, output_path: str, config: dict) -> dict: +def run_enrich_stage(input_path: str, output_path: str, config=None) -> dict: """Run configured enrichment modules in sequence. Config format: diff --git a/database/pipeline/stages/seed_stage.py b/database/pipeline/stages/seed_stage.py index c66d46b..581cdf4 100644 --- a/database/pipeline/stages/seed_stage.py +++ b/database/pipeline/stages/seed_stage.py @@ -10,7 +10,7 @@ def import_module_from_path(path: str) -> types.ModuleType: return module -def run_seed_stage(input_path: str, config: dict) -> dict: +def run_seed_stage(input_path: str, output_path: str, config=None) -> dict: """ Run the seeding module/script.