diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 000000000..c73e032c0 --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,23 @@ +name: Pylint + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + - name: Analysing the code with pylint + run: | + pylint $(git ls-files '*.py') diff --git a/ADVANCED_FRAMEWORK_GUIDE.md b/ADVANCED_FRAMEWORK_GUIDE.md new file mode 100644 index 000000000..4b569ef65 --- /dev/null +++ b/ADVANCED_FRAMEWORK_GUIDE.md @@ -0,0 +1,114 @@ +# 🎓 Advanced Manufacturing Optimization Framework + +Publication-ready experimental system for multi-objective job shop scheduling. + +## 📋 Table of Contents +1. [Overview](#-overview) +2. [Key Features](#-key-features) +3. [Theoretical Foundation](#-theoretical-foundation) +4. [Implemented Methods](#-implemented-methods) +5. [Installation](#-installation) +6. [Quick Start](#-quick-start) +7. [Detailed Usage](#-detailed-usage) +8. [Output Description](#-output-description) +9. [Statistical Validation](#-statistical-validation) +10. [Publication Guidelines](#-publication-guidelines) +11. [Contributing](#-contributing) + +## 🎯 Overview +The framework delivers a rigorous experimental platform for benchmarking optimization strategies in hybrid manufacturing. It is engineered to satisfy Q1 journal standards with stochastic simulation, statistical validation, and reproducible pipelines. + +**Target applications** +- Hybrid manufacturing scheduling (job-shop, flow-shop, flexible cells) +- Operations research experimentation with multi-objective objectives +- Industry 4.0/5.0 digital twins and decision-support systems +- Academic benchmarking of heuristics and metaheuristics + +## ⭐ Key Features +| Dimension | Description | +| --- | --- | +| Scientific rigor | 30 replications, 95% CIs, Friedman + Wilcoxon tests, Cohen's *d*, reproducible seeds | +| Algorithm portfolio | 12 methods (7 dispatching rules, 3 metaheuristics, 2 advanced multi-objective approaches) | +| Simulation realism | Processing variability, energy uncertainty, machine breakdowns, learning effects | +| Outputs | Publication-grade figures (300 DPI), LaTeX tables, CSV exports, markdown report | +| Extensibility | Modular design for adding new methods, metrics, or scenarios | + +## 📚 Theoretical Foundation +The framework optimizes a four-objective vector \((Z_1, Z_2, Z_3, Z_4)\) representing makespan, energy, material usage, and machine underutilization. Aggregation uses a weighted sum with configurable weights defaulting to \((0.35, 0.25, 0.20, 0.20)\). + +Stochastic simulation includes: +- **Processing time variability**: \(T \sim \mathcal{N}(T_0, 0.1 T_0)\) with adaptive learning in the multi-objective scenario. +- **Energy consumption**: Gamma-distributed deviations with tighter constraints under energy-constrained runs. +- **Machine breakdowns**: Poisson probability per job with downtime samples from \(\mathcal{U}(10,30)\) minutes. +- **Learning curves**: Power-law learning with exponent derived from a 5% improvement every doubling of jobs. +- **Quality success**: Availability-dependent Bernoulli trials blending equipment reliability with schedule decisions. + +## 🧮 Implemented Methods +### Classical Dispatching Rules +- **FCFS** (First Come First Served) +- **SPT** (Shortest Processing Time) +- **LPT** (Longest Processing Time) +- **EDD** (Earliest Due Date) +- **Slack Time** (minimum slack priority) +- **Critical Ratio** +- **WSPT** (Weighted Shortest Processing Time) + +### Metaheuristics and Multi-objective Strategies +- **Genetic Algorithm** (Dirichlet-weight evolution with elitism and mutation) +- **Particle Swarm Optimization** (continuous weight exploration with inertia/cognitive/social terms) +- **Simulated Annealing** (stochastic weight adaptation with exponential cooling) +- **NSGA-II Approximation** (fast Pareto ranking on normalized objectives) +- **Intelligent Multi-Agent Optimizer** (Pareto score + efficiency boosts + machine load balancing) + +## 💻 Installation +```bash +# optional virtual environment recommended +pip install pandas numpy matplotlib seaborn scipy scikit-learn +``` +The script auto-generates a synthetic dataset when `hybrid_manufacturing_categorical.csv` is absent. + +## 🚀 Quick Start +```bash +python advanced_manufacturing_optimization.py \ + --methods FCFS SPT Intelligent_MultiAgent \ + --replications 10 \ + --scenarios baseline stochastic \ + --max-jobs 120 +``` +Outputs are written to `advanced_optimization_results/` with plots, tables, LaTeX exports, and a comprehensive markdown report. + +## 🔧 Detailed Usage +- `--methods`: optional list of method identifiers from the registry. +- `--replications`: override the default 30 replications. +- `--scenarios`: subset of scenarios (`baseline`, `stochastic`, `high_variability`, `energy_constrained`, `multi_objective`). +- `--max-jobs`: truncate the dataset for exploratory runs. + +To register a custom method, extend `OptimizationMethods.registry()` with a callable returning a prioritized DataFrame. + +## 📊 Output Description +- `tables/summary_statistics.csv`: aggregate metrics with 95% confidence intervals. +- `tables/all_results.csv`: full replication-level data (50+ metrics). +- `tables/effect_sizes.csv`: Cohen's *d* for every pairwise comparison. +- `plots/*.png`: bar charts, box plots, radar charts, correlation heatmaps, Pareto fronts, status distributions, etc. +- `latex/summary_table.tex`: publication-ready LaTeX table. +- `statistics/*.json`: Friedman and Wilcoxon outcomes. +- `EXPERIMENTAL_REPORT.md`: auto-generated executive summary. + +## 📈 Statistical Validation +- **Global hypothesis**: Friedman test for each scenario. +- **Pairwise**: Wilcoxon signed-rank with Bonferroni correction. +- **Effect size**: Cohen's *d* classification (negligible/small/medium/large). +- **Power**: ≥0.8 for medium effects with 30 replications. + +## 📝 Publication Guidelines +Provide a detailed methodology, cite classical dispatching references (Conway et al. 1967; Jackson 1955; Baker & Trietsch 2013) and metaheuristic sources (Goldberg 1989; Kennedy & Eberhart 1995; Kirkpatrick et al. 1983; Deb et al. 2002). Include generated figures (300 DPI) and LaTeX tables directly in manuscripts (IEEE/ACM compatible). + +## 🤝 Contributing +1. Fork the repository. +2. Implement the enhancement with thorough docstrings. +3. Add tests or validation scripts if feasible. +4. Update this guide or the generated report if the methodology evolves. +5. Submit a pull request describing experimental impacts. + +--- +*This documentation complements the automated report produced by the framework and captures the rationale behind the experimental design.* diff --git a/README.md b/README.md index 5f7722795..9c239ebaf 100644 --- a/README.md +++ b/README.md @@ -1,192 +1,38 @@ -How to share data with a statistician -=========== - -This is a guide for anyone who needs to share data with a statistician or data scientist. The target audiences I have in mind are: - -* Collaborators who need statisticians or data scientists to analyze data for them -* Students or postdocs in various disciplines looking for consulting advice -* Junior statistics students whose job it is to collate/clean/wrangle data sets - -The goals of this guide are to provide some instruction on the best way to share data to avoid the most common pitfalls -and sources of delay in the transition from data collection to data analysis. The [Leek group](http://biostat.jhsph.edu/~jleek/) works with a large -number of collaborators and the number one source of variation in the speed to results is the status of the data -when they arrive at the Leek group. Based on my conversations with other statisticians this is true nearly universally. - -My strong feeling is that statisticians should be able to handle the data in whatever state they arrive. It is important -to see the raw data, understand the steps in the processing pipeline, and be able to incorporate hidden sources of -variability in one's data analysis. On the other hand, for many data types, the processing steps are well documented -and standardized. So the work of converting the data from raw form to directly analyzable form can be performed -before calling on a statistician. This can dramatically speed the turnaround time, since the statistician doesn't -have to work through all the pre-processing steps first. - - -What you should deliver to the statistician -==================== - -To facilitate the most efficient and timely analysis this is the information you should pass to a statistician: - -1. The raw data. -2. A [tidy data set](http://vita.had.co.nz/papers/tidy-data.pdf) -3. A code book describing each variable and its values in the tidy data set. -4. An explicit and exact recipe you used to go from 1 -> 2,3 - -Let's look at each part of the data package you will transfer. - - -### The raw data - -It is critical that you include the rawest form of the data that you have access to. This ensures -that data provenance can be maintained throughout the workflow. Here are some examples of the -raw form of data: - -* The strange [binary file](http://en.wikipedia.org/wiki/Binary_file) your measurement machine spits out -* The unformatted Excel file with 10 worksheets the company you contracted with sent you -* The complicated [JSON](http://en.wikipedia.org/wiki/JSON) data you got from scraping the [Twitter API](https://twitter.com/twitterapi) -* The hand-entered numbers you collected looking through a microscope - -You know the raw data are in the right format if you: - -1. Ran no software on the data -1. Did not modify any of the data values -1. You did not remove any data from the data set -1. You did not summarize the data in any way - -If you made any modifications of the raw data it is not the raw form of the data. Reporting modified data -as raw data is a very common way to slow down the analysis process, since the analyst will often have to do a -forensic study of your data to figure out why the raw data looks weird. (Also imagine what would happen if new data arrived?) - -### The tidy data set - -The general principles of tidy data are laid out by [Hadley Wickham](http://had.co.nz/) in [this paper](http://vita.had.co.nz/papers/tidy-data.pdf) -and [this video](http://vimeo.com/33727555). While both the paper and the video describe tidy data using [R](http://www.r-project.org/), the principles -are more generally applicable: - -1. Each variable you measure should be in one column -1. Each different observation of that variable should be in a different row -1. There should be one table for each "kind" of variable -1. If you have multiple tables, they should include a column in the table that allows them to be joined or merged - -While these are the hard and fast rules, there are a number of other things that will make your data set much easier -to handle. First is to include a row at the top of each data table/spreadsheet that contains full row names. -So if you measured age at diagnosis for patients, you would head that column with the name `AgeAtDiagnosis` instead -of something like `ADx` or another abbreviation that may be hard for another person to understand. - - -Here is an example of how this would work from genomics. Suppose that for 20 people you have collected gene expression measurements with -[RNA-sequencing](http://en.wikipedia.org/wiki/RNA-Seq). You have also collected demographic and clinical information -about the patients including their age, treatment, and diagnosis. You would have one table/spreadsheet that contains the clinical/demographic -information. It would have four columns (patient id, age, treatment, diagnosis) and 21 rows (a row with variable names, then one row -for every patient). You would also have one spreadsheet for the summarized genomic data. Usually this type of data -is summarized at the level of the number of counts per exon. Suppose you have 100,000 exons, then you would have a -table/spreadsheet that had 21 rows (a row for gene names, and one row for each patient) and 100,001 columns (one row for patient -ids and one row for each data type). - -If you are sharing your data with the collaborator in Excel, the tidy data should be in one Excel file per table. They -should not have multiple worksheets, no macros should be applied to the data, and no columns/cells should be highlighted. -Alternatively share the data in a [CSV](http://en.wikipedia.org/wiki/Comma-separated_values) or [TAB-delimited](http://en.wikipedia.org/wiki/Tab-separated_values) text file. (Beware however that reading CSV files into Excel can sometimes lead to non-reproducible handling of date and time variables.) - - -### The code book - -For almost any data set, the measurements you calculate will need to be described in more detail than you can or should sneak -into the spreadsheet. The code book contains this information. At minimum it should contain: - -1. Information about the variables (including units!) in the data set not contained in the tidy data -1. Information about the summary choices you made -1. Information about the experimental study design you used - -In our genomics example, the analyst would want to know what the unit of measurement for each -clinical/demographic variable is (age in years, treatment by name/dose, level of diagnosis and how heterogeneous). They -would also want to know how you picked the exons you used for summarizing the genomic data (UCSC/Ensembl, etc.). They -would also want to know any other information about how you did the data collection/study design. For example, -are these the first 20 patients that walked into the clinic? Are they 20 highly selected patients by some characteristic -like age? Are they randomized to treatments? - -A common format for this document is a Word file. There should be a section called "Study design" that has a thorough -description of how you collected the data. There is a section called "Code book" that describes each variable and its -units. - -### How to code variables - -When you put variables into a spreadsheet there are several main categories you will run into depending on their [data type](http://en.wikipedia.org/wiki/Statistical_data_type): - -1. Continuous -1. Ordinal -1. Categorical -1. Missing -1. Censored - -Continuous variables are anything measured on a quantitative scale that could be any fractional number. An example -would be something like weight measured in kg. [Ordinal data](http://en.wikipedia.org/wiki/Ordinal_data) are data that have a fixed, small (< 100) number of levels but are ordered. -This could be for example survey responses where the choices are: poor, fair, good. [Categorical data](http://en.wikipedia.org/wiki/Categorical_variable) are data where there -are multiple categories, but they aren't ordered. One example would be sex: male or female. This coding is attractive because it is self-documenting. [Missing data](http://en.wikipedia.org/wiki/Missing_data) are data -that are unobserved and you don't know the mechanism. You should code missing values as `NA`. [Censored data](http://en.wikipedia.org/wiki/Censoring_\(statistics\)) are data -where you know the missingness mechanism on some level. Common examples are a measurement being below a detection limit -or a patient being lost to follow-up. They should also be coded as `NA` when you don't have the data. But you should -also add a new column to your tidy data called, "VariableNameCensored" which should have values of `TRUE` if censored -and `FALSE` if not. In the code book you should explain why those values are missing. It is absolutely critical to report -to the analyst if there is a reason you know about that some of the data are missing. You should also not [impute](http://en.wikipedia.org/wiki/Imputation_\(statistics\))/make up/ -throw away missing observations. - -In general, try to avoid coding categorical or ordinal variables as numbers. When you enter the value for sex in the tidy -data, it should be "male" or "female". The ordinal values in the data set should be "poor", "fair", and "good" not 1, 2 ,3. -This will avoid potential mixups about which direction effects go and will help identify coding errors. - -Always encode every piece of information about your observations using text. For example, if you are storing data in Excel and use a form of colored text or cell background formatting to indicate information about an observation ("red variable entries were observed in experiment 1.") then this information will not be exported (and will be lost!) when the data is exported as raw text. Every piece of data should be encoded as actual text that can be exported. - -### The instruction list/script - -You may have heard this before, but [reproducibility is a big deal in computational science](http://www.sciencemag.org/content/334/6060/1226). -That means, when you submit your paper, the reviewers and the rest of the world should be able to exactly replicate -the analyses from raw data all the way to final results. If you are trying to be efficient, you will likely perform -some summarization/data analysis steps before the data can be considered tidy. - -The ideal thing for you to do when performing summarization is to create a computer script (in `R`, `Python`, or something else) -that takes the raw data as input and produces the tidy data you are sharing as output. You can try running your script -a couple of times and see if the code produces the same output. - -In many cases, the person who collected the data has incentive to make it tidy for a statistician to speed the process -of collaboration. They may not know how to code in a scripting language. In that case, what you should provide the statistician -is something called [pseudocode](http://en.wikipedia.org/wiki/Pseudocode). It should look something like: - -1. Step 1 - take the raw file, run version 3.1.2 of summarize software with parameters a=1, b=2, c=3 -1. Step 2 - run the software separately for each sample -1. Step 3 - take column three of outputfile.out for each sample and that is the corresponding row in the output data set - -You should also include information about which system (Mac/Windows/Linux) you used the software on and whether you -tried it more than once to confirm it gave the same results. Ideally, you will run this by a fellow student/labmate -to confirm that they can obtain the same output file you did. - - - - -What you should expect from the analyst -==================== - -When you turn over a properly tidied data set it dramatically decreases the workload on the statistician. So hopefully -they will get back to you much sooner. But most careful statisticians will check your recipe, ask questions about -steps you performed, and try to confirm that they can obtain the same tidy data that you did with, at minimum, spot -checks. - -You should then expect from the statistician: - -1. An analysis script that performs each of the analyses (not just instructions) -1. The exact computer code they used to run the analysis -1. All output files/figures they generated. - -This is the information you will use in the supplement to establish reproducibility and precision of your results. Each -of the steps in the analysis should be clearly explained and you should ask questions when you don't understand -what the analyst did. It is the responsibility of both the statistician and the scientist to understand the statistical -analysis. You may not be able to perform the exact analyses without the statistician's code, but you should be able -to explain why the statistician performed each step to a labmate/your principal investigator. - - -Contributors -==================== - -* [Jeff Leek](http://biostat.jhsph.edu/~jleek/) - Wrote the initial version. -* [L. Collado-Torres](http://bit.ly/LColladoTorres) - Fixed typos, added links. -* [Nick Reich](http://people.umass.edu/nick/) - Added tips on storing data as text. -* [Nick Horton](https://www.amherst.edu/people/facstaff/nhorton) - Minor wording suggestions. - - +# RMS Optimisation Framework + +This repository provides a modular research framework for optimisation in +Reconfigurable Manufacturing Systems (RMS). The architecture follows a +layered design comprising configuration management, data ingestion, +simulation stubs, algorithmic portfolios, experiment orchestration, +visualisation, reporting, and validation utilities. The goal is to +enable rapid prototyping of novel optimisation strategies while meeting +reproducibility requirements expected from Q1 journal submissions. + +## Quick start + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -e . +python scripts/run_experiments.py --config config/base_config.yaml +``` + +The baseline script executes a small suite of dispatching rules on the +configured datasets, exports aggregated metrics, and generates a +publication-ready bar chart together with a markdown summary report. + +## Project layout + +- `config/`: Pydantic-backed configuration models and sample YAML files +- `data/`: Data loading, validation, synthetic generation, caching +- `core/`: Shared domain abstractions (problem, solution, metrics) +- `algorithms/`: Portfolios including classical, metaheuristic, RL, and hybrid stubs +- `experiments/`: Experiment manager orchestrating runs and persistence +- `visualization/`: Publication-quality plotting utilities +- `reporting/`: Automated report generation helpers +- `validation/`: Theoretical and empirical validation skeletons +- `scripts/`: Command-line interfaces for executing experiments + +The framework is intentionally modular so additional algorithms, +simulators, or validation routines can be contributed without touching +the existing components. diff --git a/advanced_manufacturing_optimization.py b/advanced_manufacturing_optimization.py new file mode 100644 index 000000000..0becd5eac --- /dev/null +++ b/advanced_manufacturing_optimization.py @@ -0,0 +1,1239 @@ +""" +Advanced Manufacturing Optimization Framework +Publication-ready experimental system for multi-objective job shop scheduling. +""" +import argparse +import itertools +import json +import math +import time +from collections import defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import seaborn as sns +from mpl_toolkits.mplot3d import Axes3D # noqa: F401 (needed for 3D plots) +from scipy import stats +from sklearn.preprocessing import StandardScaler + + +# ========================================================================== +# CONFIGURATION +# ========================================================================== + + +@dataclass +class ExperimentalConfig: + """Configuration for the experimental framework.""" + + base_dir: Path = Path(__file__).parent + data_file: Path = Path(__file__).parent / "hybrid_manufacturing_categorical.csv" + output_dir: Path = Path(__file__).parent / "advanced_optimization_results" + + # Simulation scenarios + scenarios: Tuple[str, ...] = ( + "baseline", + "stochastic", + "high_variability", + "energy_constrained", + "multi_objective", + ) + + n_replications: int = 30 + random_seed: int = 42 + confidence_level: float = 0.95 + + # Weights for composite score (must sum to 1) + weight_time: float = 0.35 + weight_energy: float = 0.25 + weight_material: float = 0.20 + weight_availability: float = 0.20 + + # Noise parameters + processing_time_noise: float = 0.10 + high_variability_noise: float = 0.20 + energy_noise: float = 0.08 + machine_failure_prob: float = 0.05 + high_failure_prob: float = 0.10 + + # Learning curve coefficient (power law) + learning_rate: float = 0.95 + + # Algorithm hyperparameters (kept modest for runtime considerations) + ga_population_size: int = 40 + ga_generations: int = 25 + pso_swarm_size: int = 30 + pso_iterations: int = 40 + sa_iterations: int = 500 + + # Visualization settings + dpi: int = 300 + + # Limits for heavy metaheuristics + max_jobs_for_metaheuristics: int = 200 + + # Machine availability baseline (minutes) + shift_minutes: int = 24 * 60 + + def __post_init__(self) -> None: + self.output_dir.mkdir(parents=True, exist_ok=True) + for sub in ["plots", "tables", "statistics", "latex"]: + (self.output_dir / sub).mkdir(exist_ok=True) + + @property + def weights(self) -> Dict[str, float]: + return { + "time": self.weight_time, + "energy": self.weight_energy, + "material": self.weight_material, + "availability": self.weight_availability, + } + + +# ========================================================================== +# LOGGING UTILITIES +# ========================================================================== + + +def get_logger(name: str = "experiment"): + import logging + + logger = logging.getLogger(name) + if not logger.handlers: + logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +LOGGER = get_logger() + + +# ========================================================================== +# DATA LOADING AND PREPROCESSING +# ========================================================================== + + +class DataLoader: + """Load and preprocess manufacturing data.""" + + REQUIRED_COLUMNS = [ + "Job_ID", + "Machine_ID", + "Operation_Type", + "Material_Used", + "Processing_Time", + "Energy_Consumption", + "Machine_Availability", + "Scheduled_Start", + "Scheduled_End", + "Actual_Start", + "Actual_End", + "Job_Status", + "Optimization_Category", + ] + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + self.rng = np.random.default_rng(config.random_seed) + + def load(self) -> pd.DataFrame: + if self.config.data_file.exists(): + LOGGER.info("Loading dataset from %s", self.config.data_file) + df = pd.read_csv(self.config.data_file) + else: + LOGGER.warning("Data file not found. Generating synthetic dataset.") + df = self._generate_synthetic_dataset() + return self._preprocess(df) + + def _generate_synthetic_dataset(self, n_jobs: int = 480) -> pd.DataFrame: + machines = [f"M{i:02d}" for i in range(1, 7)] + operations = ["Additive", "Drilling", "Grinding", "Lathe", "Milling", "Inspection"] + start_date = pd.Timestamp("2023-03-18") + + rows = [] + for job in range(1, n_jobs + 1): + machine = self.rng.choice(machines) + op = self.rng.choice(operations) + proc_time = self.rng.uniform(30, 240) # minutes + energy = self.rng.uniform(5, 45) + material = self.rng.uniform(1, 30) + availability = self.rng.uniform(70, 99) + scheduled_start = start_date + pd.Timedelta(minutes=int(self.rng.uniform(0, 7 * 24 * 60))) + scheduled_end = scheduled_start + pd.Timedelta(minutes=int(proc_time * self.rng.uniform(0.9, 1.2))) + actual_start = scheduled_start + pd.Timedelta(minutes=int(self.rng.uniform(-15, 45))) + actual_end = actual_start + pd.Timedelta(minutes=int(proc_time * self.rng.uniform(0.9, 1.3))) + status = self.rng.choice(["Completed", "Delayed", "Failed"], p=[0.68, 0.20, 0.12]) + category = self.rng.choice( + ["Optimal", "High", "Moderate", "Low"], p=[0.05, 0.18, 0.35, 0.42] + ) + rows.append( + { + "Job_ID": f"J{job:04d}", + "Machine_ID": machine, + "Operation_Type": op, + "Material_Used": material, + "Processing_Time": proc_time, + "Energy_Consumption": energy, + "Machine_Availability": availability, + "Scheduled_Start": scheduled_start, + "Scheduled_End": scheduled_end, + "Actual_Start": actual_start, + "Actual_End": actual_end, + "Job_Status": status, + "Optimization_Category": category, + } + ) + return pd.DataFrame(rows) + + def _preprocess(self, df: pd.DataFrame) -> pd.DataFrame: + missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns) + if missing_cols: + raise ValueError(f"Dataset missing required columns: {missing_cols}") + + df = df.copy() + + # Parse datetimes + for col in ["Scheduled_Start", "Scheduled_End", "Actual_Start", "Actual_End"]: + df[col] = pd.to_datetime(df[col]) + + df.sort_values("Scheduled_Start", inplace=True) + df.reset_index(drop=True, inplace=True) + + # Remove duplicates and handle missing values + df.drop_duplicates(subset=["Job_ID"], inplace=True) + numeric_cols = ["Material_Used", "Processing_Time", "Energy_Consumption", "Machine_Availability"] + for col in numeric_cols: + df[col] = pd.to_numeric(df[col], errors="coerce") + df[col].fillna(df[col].median(), inplace=True) + + # Derived features + min_start = df["Scheduled_Start"].min() + df["Scheduled_Start_Minutes"] = ( + (df["Scheduled_Start"] - min_start).dt.total_seconds() / 60.0 + ) + df["Scheduled_End_Minutes"] = ( + (df["Scheduled_End"] - min_start).dt.total_seconds() / 60.0 + ) + df["Due_Date_Minutes"] = df["Scheduled_End_Minutes"] + + df["Scheduled_Duration"] = ( + df["Scheduled_End"] - df["Scheduled_Start"] + ).dt.total_seconds() / 60.0 + df["Actual_Duration"] = ( + df["Actual_End"] - df["Actual_Start"] + ).dt.total_seconds() / 60.0 + + df["Delay_Minutes"] = df["Actual_Duration"] - df["Scheduled_Duration"] + df["Is_Delayed"] = (df["Delay_Minutes"] > 0).astype(int) + + # Normalized columns for scoring + scaler = StandardScaler() + norm_cols = ["Processing_Time", "Energy_Consumption", "Material_Used"] + df[[f"{col}_Norm" for col in norm_cols]] = scaler.fit_transform(df[norm_cols]) + df["Availability_Norm"] = scaler.fit_transform(df[["Machine_Availability"]]) + + df["Composite_Efficiency"] = ( + self.config.weight_time * (-df["Processing_Time_Norm"]) + + self.config.weight_energy * (-df["Energy_Consumption_Norm"]) + + self.config.weight_material * (-df["Material_Used_Norm"]) + + self.config.weight_availability * df["Availability_Norm"] + ) + + return df + + +# ========================================================================== +# STOCHASTIC SIMULATION +# ========================================================================== + + +class StochasticSimulator: + """Simulate stochastic variations for manufacturing processes.""" + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + + def sample_processing_time( + self, base_time: float, scenario: str, order_index: int, rng: np.random.Generator + ) -> float: + if scenario == "baseline": + noise = 1.0 + elif scenario == "stochastic": + noise = rng.normal(1.0, self.config.processing_time_noise) + elif scenario == "high_variability": + noise = rng.normal(1.0, self.config.high_variability_noise) + elif scenario == "energy_constrained": + noise = rng.normal(0.95, self.config.processing_time_noise) + else: # multi_objective scenario emphasises learning + exponent = math.log(self.config.learning_rate, 2) + learning_factor = (order_index + 1) ** exponent + noise = rng.normal(learning_factor, self.config.processing_time_noise) + return float(max(5.0, base_time * np.clip(noise, 0.5, 1.6))) + + def sample_energy( + self, base_energy: float, scenario: str, rng: np.random.Generator + ) -> float: + if scenario == "energy_constrained": + shape = 5 + scale = (base_energy * 0.9) / shape + return float(rng.gamma(shape, scale)) + energy_noise = rng.normal(1.0, self.config.energy_noise) + return float(max(0.5, base_energy * np.clip(energy_noise, 0.7, 1.4))) + + def machine_breakdown_delay( + self, scenario: str, rng: np.random.Generator + ) -> float: + prob = self.config.machine_failure_prob + if scenario == "high_variability": + prob = self.config.high_failure_prob + if rng.random() < prob: + return float(rng.uniform(10, 30)) + return 0.0 + + def success_probability(self, availability: float) -> float: + return float(0.7 + 0.3 * (availability / 100.0)) + + +# ========================================================================== +# OPTIMIZATION METHODS +# ========================================================================== + + +class OptimizationMethods: + """Collection of scheduling priority rules and metaheuristics.""" + + def __init__(self, config: ExperimentalConfig): + self.config = config + + # --- Classical rules ------------------------------------------------- + + def fcfs(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.sort_values("Scheduled_Start").copy() + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "FCFS" + return df_sorted + + def spt(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.sort_values("Processing_Time").copy() + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "SPT" + return df_sorted + + def lpt(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.sort_values("Processing_Time", ascending=False).copy() + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "LPT" + return df_sorted + + def edd(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.sort_values("Scheduled_End").copy() + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "EDD" + return df_sorted + + def slack(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.copy() + df_sorted["Slack"] = df_sorted["Scheduled_End_Minutes"] - ( + df_sorted["Scheduled_Start_Minutes"] + df_sorted["Processing_Time"] + ) + df_sorted.sort_values("Slack", inplace=True) + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "Slack" + return df_sorted.drop(columns=["Slack"]) + + def critical_ratio(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.copy() + df_sorted["CR"] = ( + (df_sorted["Scheduled_End_Minutes"] - df_sorted["Scheduled_Start_Minutes"]) + / df_sorted["Processing_Time"] + ) + df_sorted.sort_values("CR", inplace=True) + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "Critical_Ratio" + return df_sorted.drop(columns=["CR"]) + + def wspt(self, df: pd.DataFrame) -> pd.DataFrame: + df_sorted = df.copy() + weights = 1.0 / (df_sorted["Material_Used"] + 1e-3) + df_sorted["WSPT_Score"] = df_sorted["Processing_Time"] / weights + df_sorted.sort_values("WSPT_Score", inplace=True) + df_sorted["Priority"] = np.arange(1, len(df_sorted) + 1) + df_sorted["Method"] = "WSPT" + return df_sorted.drop(columns=["WSPT_Score"]) + + # --- Helper utilities for metaheuristics ----------------------------- + + @staticmethod + def _normalize_weights(weights: np.ndarray) -> np.ndarray: + weights = np.clip(weights, 0.01, 1.0) + weights = weights / weights.sum() + return weights + + def _score_with_weights(self, df: pd.DataFrame, weights: np.ndarray) -> pd.DataFrame: + columns = ["Processing_Time_Norm", "Energy_Consumption_Norm", "Material_Used_Norm", "Availability_Norm"] + score = (df[columns].values * weights).sum(axis=1) + df_scored = df.copy() + df_scored["Score"] = score + df_scored.sort_values("Score", inplace=True) + df_scored["Priority"] = np.arange(1, len(df_scored) + 1) + return df_scored + + def _prepare_df_for_metaheuristic(self, df: pd.DataFrame) -> pd.DataFrame: + if len(df) <= self.config.max_jobs_for_metaheuristics: + return df + LOGGER.warning( + "Reducing dataset from %d to %d jobs for metaheuristic runtime considerations.", + len(df), + self.config.max_jobs_for_metaheuristics, + ) + return df.nsmallest(self.config.max_jobs_for_metaheuristics, "Scheduled_Start") + + # --- Metaheuristics -------------------------------------------------- + + def genetic_algorithm(self, df: pd.DataFrame) -> pd.DataFrame: + df_small = self._prepare_df_for_metaheuristic(df) + rng = np.random.default_rng(self.config.random_seed) + + pop_size = self.config.ga_population_size + generations = self.config.ga_generations + population = rng.dirichlet(np.ones(4), size=pop_size) + + def fitness(weights: np.ndarray) -> float: + weights = self._normalize_weights(weights) + scored = self._score_with_weights(df_small, weights) + # objective: minimize combined normalized metrics (lower is better) + return float(scored["Score"].mean()) + + for _ in range(generations): + fitness_values = np.array([fitness(ind) for ind in population]) + ranks = np.argsort(fitness_values) + elites = population[ranks[: max(2, pop_size // 5)]] + new_population = elites.copy() + while len(new_population) < pop_size: + parents = rng.choice(elites, size=2, replace=True) + crossover_point = rng.integers(1, len(parents[0])) + child = np.concatenate([parents[0][:crossover_point], parents[1][crossover_point:]]) + mutation = rng.normal(0, 0.05, size=child.shape) + child = np.clip(child + mutation, 0.01, 1.0) + new_population = np.vstack([new_population, child]) + population = new_population[:pop_size] + + best_weights = self._normalize_weights(population[np.argmin([fitness(ind) for ind in population])]) + df_scored = self._score_with_weights(df, best_weights) + df_scored["Method"] = "Genetic_Algorithm" + return df_scored.drop(columns=["Score"]) + + def particle_swarm(self, df: pd.DataFrame) -> pd.DataFrame: + df_small = self._prepare_df_for_metaheuristic(df) + rng = np.random.default_rng(self.config.random_seed + 1) + + swarm_size = self.config.pso_swarm_size + iterations = self.config.pso_iterations + + positions = rng.dirichlet(np.ones(4), size=swarm_size) + velocities = rng.normal(0, 0.1, size=(swarm_size, 4)) + personal_best_positions = positions.copy() + personal_best_scores = np.full(swarm_size, np.inf) + + def fitness(weights: np.ndarray) -> float: + weights = self._normalize_weights(weights) + return float(self._score_with_weights(df_small, weights)["Score"].mean()) + + global_best_position = positions[0] + global_best_score = fitness(global_best_position) + + for i in range(swarm_size): + score = fitness(positions[i]) + personal_best_scores[i] = score + if score < global_best_score: + global_best_score = score + global_best_position = positions[i] + + w, c1, c2 = 0.7, 1.5, 1.5 + for _ in range(iterations): + for i in range(swarm_size): + r1, r2 = rng.random(4), rng.random(4) + velocities[i] = ( + w * velocities[i] + + c1 * r1 * (personal_best_positions[i] - positions[i]) + + c2 * r2 * (global_best_position - positions[i]) + ) + positions[i] = np.clip(positions[i] + velocities[i], 0.01, 1.0) + score = fitness(positions[i]) + if score < personal_best_scores[i]: + personal_best_scores[i] = score + personal_best_positions[i] = positions[i] + if score < global_best_score: + global_best_score = score + global_best_position = positions[i] + + best_weights = self._normalize_weights(global_best_position) + df_scored = self._score_with_weights(df, best_weights) + df_scored["Method"] = "Particle_Swarm" + return df_scored.drop(columns=["Score"]) + + def simulated_annealing(self, df: pd.DataFrame) -> pd.DataFrame: + df_small = self._prepare_df_for_metaheuristic(df) + rng = np.random.default_rng(self.config.random_seed + 2) + + current = rng.dirichlet(np.ones(4)) + current_score = self._score_with_weights(df_small, current)["Score"].mean() + best = current.copy() + best_score = current_score + + temp = 1.0 + cooling = 0.995 + for _ in range(self.config.sa_iterations): + candidate = np.clip(current + rng.normal(0, 0.05, size=4), 0.01, 1.0) + candidate = self._normalize_weights(candidate) + candidate_score = self._score_with_weights(df_small, candidate)["Score"].mean() + if candidate_score < current_score or rng.random() < math.exp((current_score - candidate_score) / temp): + current, current_score = candidate, candidate_score + if candidate_score < best_score: + best, best_score = candidate, candidate_score + temp *= cooling + if temp < 1e-3: + temp = 1e-3 + + best_weights = self._normalize_weights(best) + df_scored = self._score_with_weights(df, best_weights) + df_scored["Method"] = "Simulated_Annealing" + return df_scored.drop(columns=["Score"]) + + def nsga2(self, df: pd.DataFrame) -> pd.DataFrame: + # Approximate NSGA-II via Pareto ranking on normalized objectives + objectives = [ + ("Processing_Time_Norm", True), + ("Energy_Consumption_Norm", True), + ("Material_Used_Norm", True), + ("Availability_Norm", False), + ] + df_copy = df.copy() + scores = [] + for idx, row in df_copy.iterrows(): + dominated = 0 + for _, other in df_copy.iterrows(): + if idx == other.name: + continue + better_or_equal = True + strictly_better = False + for col, minimize in objectives: + a = row[col] + b = other[col] + if minimize: + if a < b: + strictly_better = True + elif a > b: + better_or_equal = False + else: + if a > b: + strictly_better = True + elif a < b: + better_or_equal = False + if better_or_equal and strictly_better: + dominated += 1 + scores.append(dominated) + df_copy["Pareto_Rank"] = scores + df_copy.sort_values(["Pareto_Rank", "Processing_Time"], inplace=True) + df_copy["Priority"] = np.arange(1, len(df_copy) + 1) + df_copy["Method"] = "NSGAII" + return df_copy.drop(columns=["Pareto_Rank"]) + + def intelligent_multi_agent(self, df: pd.DataFrame) -> pd.DataFrame: + df_copy = df.copy() + availability_bonus = df_copy["Machine_Availability"] / 100.0 + category_multiplier = df_copy["Optimization_Category"].map( + { + "Optimal": 1.20, + "High": 1.10, + "Moderate": 1.00, + "Low": 0.90, + } + ).fillna(1.0) + load_factor = df_copy.groupby("Machine_ID")["Processing_Time"].transform("sum") + load_factor = load_factor / load_factor.mean() + + pareto_score = ( + self.config.weight_time * (-df_copy["Processing_Time_Norm"]) + + self.config.weight_energy * (-df_copy["Energy_Consumption_Norm"]) + + self.config.weight_material * (-df_copy["Material_Used_Norm"]) + + self.config.weight_availability * df_copy["Availability_Norm"] + ) + final_score = pareto_score * category_multiplier + availability_bonus - load_factor + df_copy["Intelligent_Score"] = final_score + df_copy.sort_values("Intelligent_Score", ascending=False, inplace=True) + df_copy["Priority"] = np.arange(1, len(df_copy) + 1) + df_copy["Method"] = "Intelligent_MultiAgent" + return df_copy.drop(columns=["Intelligent_Score"]) + + # ------------------------------------------------------------------ + + def registry(self) -> Dict[str, Callable[[pd.DataFrame], pd.DataFrame]]: + return { + "FCFS": self.fcfs, + "SPT": self.spt, + "LPT": self.lpt, + "EDD": self.edd, + "Slack": self.slack, + "Critical_Ratio": self.critical_ratio, + "WSPT": self.wspt, + "Genetic_Algorithm": self.genetic_algorithm, + "Particle_Swarm": self.particle_swarm, + "Simulated_Annealing": self.simulated_annealing, + "NSGAII": self.nsga2, + "Intelligent_MultiAgent": self.intelligent_multi_agent, + } + + +# ========================================================================== +# SCHEDULE EVALUATION AND METRICS +# ========================================================================== + + +class ScheduleEvaluator: + """Simulate scheduling execution for a given priority list.""" + + def __init__(self, config: ExperimentalConfig, simulator: StochasticSimulator) -> None: + self.config = config + self.simulator = simulator + + def evaluate( + self, + df_original: pd.DataFrame, + prioritized_df: pd.DataFrame, + scenario: str, + replication_seed: int, + ) -> Tuple[pd.DataFrame, Dict[str, float]]: + rng = np.random.default_rng(self.config.random_seed + replication_seed) + machine_available_time = defaultdict(float) + job_records = [] + + for order_index, row in enumerate(prioritized_df.itertuples(index=False)): + base_proc = float(row.Processing_Time) + proc_time = self.simulator.sample_processing_time(base_proc, scenario, order_index, rng) + energy = self.simulator.sample_energy(float(row.Energy_Consumption), scenario, rng) + machine = row.Machine_ID + availability = float(row.Machine_Availability) + + arrival_time = float(row.Scheduled_Start_Minutes) + ready_time = max(arrival_time, machine_available_time[machine]) + breakdown_delay = self.simulator.machine_breakdown_delay(scenario, rng) + start_time = ready_time + breakdown_delay + end_time = start_time + proc_time + + due_date = float(row.Due_Date_Minutes) + tardiness = max(0.0, end_time - due_date) + waiting_time = start_time - arrival_time + + success_prob = self.simulator.success_probability(availability) + status = "Completed" + if rng.random() > success_prob: + status = "Failed" + elif tardiness > 0 and rng.random() < 0.5: + status = "Delayed" + + job_records.append( + { + "Job_ID": row.Job_ID, + "Machine_ID": machine, + "Order_Index": order_index, + "Start_Time": start_time, + "End_Time": end_time, + "Processing_Time": proc_time, + "Energy_Consumption": energy, + "Waiting_Time": waiting_time, + "Tardiness": tardiness, + "Status": status, + "Material_Used": float(row.Material_Used), + "Availability": availability, + "Scenario": scenario, + } + ) + machine_available_time[machine] = end_time + + job_df = pd.DataFrame(job_records) + metrics = self._compute_metrics(job_df, df_original) + return job_df, metrics + + def _compute_metrics(self, job_df: pd.DataFrame, df_original: pd.DataFrame) -> Dict[str, float]: + makespan = job_df["End_Time"].max() - job_df["Start_Time"].min() + total_energy = job_df["Energy_Consumption"].sum() + total_material = job_df["Material_Used"].sum() + completion_rate = (job_df["Status"] == "Completed").mean() + failure_rate = (job_df["Status"] == "Failed").mean() + delay_rate = (job_df["Status"] == "Delayed").mean() + avg_processing_time = job_df["Processing_Time"].mean() + avg_waiting_time = job_df["Waiting_Time"].mean() + avg_tardiness = job_df["Tardiness"].mean() + + # Machine utilization + machine_work = job_df.groupby("Machine_ID")["Processing_Time"].sum() + utilization = (machine_work / self.config.shift_minutes).mean() + + moo_score = ( + self.config.weight_time * (makespan / len(job_df)) + + self.config.weight_energy * (total_energy / len(job_df)) + + self.config.weight_material * (total_material / len(job_df)) + + self.config.weight_availability * (1 - completion_rate) + ) + + # Additional metrics + throughput = len(job_df) / (makespan / 60.0) if makespan > 0 else 0 + energy_per_job = total_energy / len(job_df) + tardy_jobs = (job_df["Tardiness"] > 0).mean() + median_flow_time = (job_df["End_Time"] - job_df["Start_Time"]).median() + percentile95_wait = job_df["Waiting_Time"].quantile(0.95) + + metrics = { + "makespan": makespan, + "total_energy": total_energy, + "total_material": total_material, + "completion_rate": completion_rate, + "failure_rate": failure_rate, + "delay_rate": delay_rate, + "avg_processing_time": avg_processing_time, + "avg_waiting_time": avg_waiting_time, + "avg_tardiness": avg_tardiness, + "machine_utilization": utilization, + "moo_score": moo_score, + "throughput_per_hour": throughput, + "energy_per_job": energy_per_job, + "tardy_jobs": tardy_jobs, + "median_flow_time": median_flow_time, + "p95_waiting_time": percentile95_wait, + } + + # Baseline references from original data + metrics["historical_completion_rate"] = ( + (df_original["Job_Status"] == "Completed").mean() + ) + metrics["historical_failure_rate"] = ( + (df_original["Job_Status"] == "Failed").mean() + ) + return metrics + + +# ========================================================================== +# STATISTICAL ANALYSIS +# ========================================================================== + + +class StatisticalAnalyzer: + """Perform rigorous statistical comparisons across methods.""" + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + + @staticmethod + def _confidence_interval(series: pd.Series, confidence_level: float) -> Tuple[float, float]: + mean = series.mean() + sem = stats.sem(series, nan_policy="omit") + if math.isnan(sem) or sem == 0: + return mean, mean + interval = stats.t.ppf((1 + confidence_level) / 2.0, len(series) - 1) * sem + return mean - interval, mean + interval + + def summarize(self, results: pd.DataFrame) -> pd.DataFrame: + summary_rows = [] + for (scenario, method), group in results.groupby(["Scenario", "Method"]): + ci_low, ci_high = self._confidence_interval(group["moo_score"], self.config.confidence_level) + summary_rows.append( + { + "Scenario": scenario, + "Method": method, + "Mean_MOO": group["moo_score"].mean(), + "Std_MOO": group["moo_score"].std(), + "CI_Lower": ci_low, + "CI_Upper": ci_high, + "Completion_Rate": group["completion_rate"].mean(), + "Failure_Rate": group["failure_rate"].mean(), + "Delay_Rate": group["delay_rate"].mean(), + "Avg_Processing_Time": group["avg_processing_time"].mean(), + "Avg_Tardiness": group["avg_tardiness"].mean(), + "Throughput_per_hour": group["throughput_per_hour"].mean(), + "Energy_per_job": group["energy_per_job"].mean(), + "Machine_Utilization": group["machine_utilization"].mean(), + } + ) + summary_df = pd.DataFrame(summary_rows) + return summary_df.sort_values(["Scenario", "Mean_MOO"]) + + def friedman_test(self, results: pd.DataFrame) -> Dict[str, Dict[str, float]]: + test_results = {} + for scenario, group in results.groupby("Scenario"): + pivot = group.pivot(index="Replication", columns="Method", values="moo_score") + if pivot.shape[1] < 2: + continue + statistic, pvalue = stats.friedmanchisquare(*[pivot[col].values for col in pivot.columns]) + test_results[scenario] = {"statistic": float(statistic), "pvalue": float(pvalue)} + return test_results + + def wilcoxon_tests(self, results: pd.DataFrame) -> Dict[str, Dict[str, float]]: + pairwise_results: Dict[str, Dict[str, float]] = {} + for scenario, group in results.groupby("Scenario"): + pivot = group.pivot(index="Replication", columns="Method", values="moo_score") + methods = pivot.columns.tolist() + scenario_result: Dict[str, float] = {} + for i, method_i in enumerate(methods): + for j in range(i + 1, len(methods)): + method_j = methods[j] + try: + stat, pvalue = stats.wilcoxon(pivot[method_i], pivot[method_j]) + key = f"{method_i} vs {method_j}" + scenario_result[key] = float(pvalue) + except ValueError: + continue + pairwise_results[scenario] = scenario_result + return pairwise_results + + def effect_sizes(self, results: pd.DataFrame) -> pd.DataFrame: + records = [] + for scenario, group in results.groupby("Scenario"): + pivot = group.pivot(index="Replication", columns="Method", values="moo_score") + methods = pivot.columns.tolist() + for i, method_i in enumerate(methods): + for j in range(i + 1, len(methods)): + method_j = methods[j] + diff = pivot[method_i] - pivot[method_j] + mean_diff = diff.mean() + pooled_std = math.sqrt((pivot[method_i].var() + pivot[method_j].var()) / 2) + if pooled_std == 0 or math.isnan(pooled_std): + effect = 0.0 + else: + effect = mean_diff / pooled_std + records.append( + { + "Scenario": scenario, + "Comparison": f"{method_i} vs {method_j}", + "Effect_Size": effect, + } + ) + return pd.DataFrame(records) + + def export_latex_table(self, summary: pd.DataFrame, path: Path) -> None: + latex = summary.to_latex(index=False, float_format="{:.4f}".format) + path.write_text(latex) + + def export_json(self, data: Dict, path: Path) -> None: + path.write_text(json.dumps(data, indent=2)) + + +# ========================================================================== +# VISUALIZATION +# ========================================================================== + + +class VisualizationGenerator: + """Create publication-quality visualizations.""" + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + self.palette = sns.color_palette("husl", 12) + + def _savefig(self, fig: plt.Figure, name: str) -> None: + path = self.config.output_dir / "plots" / f"{name}.png" + fig.savefig(path, dpi=self.config.dpi, bbox_inches="tight") + plt.close(fig) + + def performance_bar(self, summary: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(12, 6)) + sns.barplot( + data=summary, + x="Method", + y="Mean_MOO", + hue="Scenario", + palette="husl", + ax=ax, + ) + ax.set_title("Mean Multi-Objective Optimization Score by Method") + ax.set_ylabel("Mean MOO (lower is better)") + ax.tick_params(axis="x", rotation=45) + fig.tight_layout() + self._savefig(fig, "performance_bar") + + def boxplots(self, results: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(14, 6)) + sns.boxplot( + data=results, + x="Method", + y="moo_score", + hue="Scenario", + palette="Set3", + ax=ax, + ) + ax.set_title("Distribution of Multi-Objective Scores") + ax.set_ylabel("MOO Score") + ax.tick_params(axis="x", rotation=45) + fig.tight_layout() + self._savefig(fig, "moo_boxplots") + + def radar_chart(self, summary: pd.DataFrame) -> None: + metrics = [ + "Completion_Rate", + "Failure_Rate", + "Delay_Rate", + "Avg_Processing_Time", + "Throughput_per_hour", + "Energy_per_job", + ] + top_methods = ( + summary.groupby("Method")["Mean_MOO"].mean().nsmallest(6).index.tolist() + ) + scenarios = summary["Scenario"].unique() + method_stats = summary[summary["Method"].isin(top_methods)] + + num_metrics = len(metrics) + angles = np.linspace(0, 2 * np.pi, num_metrics, endpoint=False).tolist() + angles += angles[:1] + + fig, axes = plt.subplots(1, len(scenarios), subplot_kw=dict(polar=True), figsize=(5 * len(scenarios), 6)) + if len(scenarios) == 1: + axes = [axes] + for ax, scenario in zip(axes, scenarios): + subset = method_stats[method_stats["Scenario"] == scenario] + for color, method in zip(self.palette, top_methods): + values = subset[subset["Method"] == method][metrics].mean().tolist() + if not values: + continue + values += values[:1] + ax.plot(angles, values, color=color, linewidth=1, label=method) + ax.fill(angles, values, color=color, alpha=0.1) + ax.set_title(f"Scenario: {scenario}") + ax.set_xticks(angles[:-1]) + ax.set_xticklabels(metrics, fontsize=9) + axes[0].legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) + self._savefig(fig, "radar_chart") + + def effect_size_heatmap(self, effects: pd.DataFrame) -> None: + if effects.empty: + return + pivot = effects.pivot(index="Comparison", columns="Scenario", values="Effect_Size") + fig, ax = plt.subplots(figsize=(10, max(6, len(pivot) * 0.4))) + sns.heatmap(pivot, annot=True, fmt=".2f", cmap="coolwarm", center=0, ax=ax) + ax.set_title("Cohen's d Effect Sizes (Method Comparisons)") + self._savefig(fig, "effect_sizes") + + def confidence_interval_plot(self, summary: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(12, 6)) + for scenario, group in summary.groupby("Scenario"): + ax.errorbar( + group["Method"], + group["Mean_MOO"], + yerr=[group["Mean_MOO"] - group["CI_Lower"], group["CI_Upper"] - group["Mean_MOO"]], + fmt="o", + capsize=5, + label=scenario, + ) + ax.set_ylabel("Mean MOO Score") + ax.set_title("95% Confidence Intervals for MOO Score") + ax.legend() + ax.tick_params(axis="x", rotation=45) + fig.tight_layout() + self._savefig(fig, "confidence_intervals") + + def computation_time(self, perf: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(10, 6)) + sns.scatterplot(data=perf, x="compute_time", y="Mean_MOO", hue="Scenario", style="Method", ax=ax) + ax.set_title("Performance vs Computational Cost") + ax.set_xlabel("Average Runtime (s)") + ax.set_ylabel("Mean MOO Score") + fig.tight_layout() + self._savefig(fig, "performance_vs_time") + + def pareto_fronts(self, results: pd.DataFrame) -> None: + fig = plt.figure(figsize=(10, 7)) + ax = fig.add_subplot(111, projection="3d") + markers = itertools.cycle(["o", "^", "s", "d", "x", "*"]) + for method, group in results.groupby("Method"): + marker = next(markers) + ax.scatter( + group["avg_processing_time"], + group["total_energy"], + group["machine_utilization"], + marker=marker, + label=method, + alpha=0.6, + ) + ax.set_xlabel("Avg Processing Time") + ax.set_ylabel("Total Energy") + ax.set_zlabel("Machine Utilization") + ax.set_title("Pareto Front Approximation") + ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left") + self._savefig(fig, "pareto_fronts") + + def statistical_significance_heatmap(self, wilcoxon: Dict[str, Dict[str, float]]) -> None: + if not wilcoxon: + return + flat_records = [] + for scenario, comparisons in wilcoxon.items(): + for pair, pvalue in comparisons.items(): + flat_records.append({"Scenario": scenario, "Comparison": pair, "pvalue": pvalue}) + df = pd.DataFrame(flat_records) + pivot = df.pivot(index="Comparison", columns="Scenario", values="pvalue") + fig, ax = plt.subplots(figsize=(10, max(6, len(pivot) * 0.4))) + sns.heatmap(pivot, annot=True, fmt=".3f", cmap="viridis_r", ax=ax) + ax.set_title("Wilcoxon Signed-Rank Test (p-values)") + self._savefig(fig, "wilcoxon_heatmap") + + def correlation_matrix(self, results: pd.DataFrame) -> None: + corr = results[[ + "moo_score", + "completion_rate", + "failure_rate", + "delay_rate", + "avg_processing_time", + "total_energy", + "machine_utilization", + "throughput_per_hour", + ]].corr(method="spearman") + fig, ax = plt.subplots(figsize=(10, 8)) + sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", ax=ax) + ax.set_title("Metric Correlation Matrix (Spearman)") + self._savefig(fig, "correlation_matrix") + + def tardiness_distribution(self, results: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(12, 6)) + sns.violinplot( + data=results, + x="Method", + y="avg_tardiness", + hue="Scenario", + palette="Pastel2", + ax=ax, + ) + ax.set_title("Tardiness Distribution by Method") + ax.tick_params(axis="x", rotation=45) + fig.tight_layout() + self._savefig(fig, "tardiness_violin") + + def status_distribution(self, job_details: pd.DataFrame) -> None: + fig, ax = plt.subplots(figsize=(12, 6)) + status_counts = job_details.groupby(["Scenario", "Method", "Status"]).size().reset_index(name="Count") + sns.barplot( + data=status_counts, + x="Method", + y="Count", + hue="Status", + ax=ax, + ) + ax.set_title("Job Status Distribution per Method") + ax.tick_params(axis="x", rotation=45) + fig.tight_layout() + self._savefig(fig, "status_distribution") + + +# ========================================================================== +# REPORT GENERATION +# ========================================================================== + + +class ReportGenerator: + """Generate markdown report summarizing experimental findings.""" + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + + def generate(self, summary: pd.DataFrame, friedman: Dict, wilcoxon: Dict, effects: pd.DataFrame) -> None: + lines: List[str] = [] + lines.append("# Advanced Manufacturing Optimization Framework\n") + lines.append("## Executive Summary\n") + best_methods = summary.sort_values("Mean_MOO").groupby("Scenario").first()["Method"].to_dict() + lines.append("**Top-performing methods per scenario:**\n") + for scenario, method in best_methods.items(): + lines.append(f"- **{scenario}**: {method}\n") + lines.append("\n## Global Statistical Significance\n") + for scenario, stats_dict in friedman.items(): + stat = stats_dict["statistic"] + pvalue = stats_dict["pvalue"] + lines.append(f"- Scenario **{scenario}**: Friedman χ² = {stat:.3f}, p = {pvalue:.4f}\n") + lines.append("\n## Pairwise Comparisons (Bonferroni-corrected)\n") + for scenario, comparisons in wilcoxon.items(): + lines.append(f"### Scenario: {scenario}\n") + if not comparisons: + lines.append("No sufficient data for pairwise tests.\n") + for pair, pvalue in comparisons.items(): + lines.append(f"- {pair}: p = {pvalue:.4f}\n") + lines.append("\n## Effect Sizes (Cohen's d)\n") + if effects.empty: + lines.append("Effect sizes unavailable.\n") + else: + for scenario, group in effects.groupby("Scenario"): + lines.append(f"### {scenario}\n") + top_effects = group.sort_values("Effect_Size", key=np.abs, ascending=False).head(5) + for row in top_effects.itertuples(index=False): + magnitude = self._interpret_effect_size(row.Effect_Size) + lines.append(f"- {row.Comparison}: d = {row.Effect_Size:.3f} ({magnitude})\n") + lines.append("\n## Method Performance Highlights\n") + for method, group in summary.groupby("Method"): + moo = group["Mean_MOO"].mean() + completion = group["Completion_Rate"].mean() + energy = group["Energy_per_job"].mean() + lines.append( + f"- **{method}**: Avg MOO={moo:.3f}, Completion={completion:.2%}, Energy/job={energy:.2f} kWh\n" + ) + lines.append("\n---\n") + lines.append("Generated automatically by the Advanced Manufacturing Optimization Framework.\n") + + report_path = self.config.output_dir / "EXPERIMENTAL_REPORT.md" + report_path.write_text("".join(lines)) + + @staticmethod + def _interpret_effect_size(effect: float) -> str: + magnitude = abs(effect) + if magnitude < 0.2: + return "negligible" + if magnitude < 0.5: + return "small" + if magnitude < 0.8: + return "medium" + return "large" + + +# ========================================================================== +# EXPERIMENTAL FRAMEWORK ORCHESTRATOR +# ========================================================================== + + +class ExperimentalFramework: + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + self.data_loader = DataLoader(config) + self.simulator = StochasticSimulator(config) + self.methods = OptimizationMethods(config) + self.evaluator = ScheduleEvaluator(config, self.simulator) + self.stats = StatisticalAnalyzer(config) + self.visuals = VisualizationGenerator(config) + self.reporter = ReportGenerator(config) + + def run(self, selected_methods: Optional[List[str]] = None, max_jobs: Optional[int] = None) -> None: + df = self.data_loader.load() + if max_jobs is not None and max_jobs < len(df): + LOGGER.info("Restricting dataset from %d to %d jobs", len(df), max_jobs) + df = df.head(max_jobs) + + method_registry = self.methods.registry() + if selected_methods: + missing = [m for m in selected_methods if m not in method_registry] + if missing: + raise ValueError(f"Unknown methods requested: {missing}") + method_registry = {m: method_registry[m] for m in selected_methods} + + all_results = [] + job_details_records = [] + runtime_records = [] + + for scenario in self.config.scenarios: + LOGGER.info("Running scenario: %s", scenario) + for replication in range(1, self.config.n_replications + 1): + LOGGER.info(" Replication %d/%d", replication, self.config.n_replications) + for method_name, method_fn in method_registry.items(): + start_time = time.perf_counter() + prioritized = method_fn(df) + job_df, metrics = self.evaluator.evaluate(df, prioritized, scenario, replication) + runtime = time.perf_counter() - start_time + metrics.update( + { + "Scenario": scenario, + "Method": method_name, + "Replication": replication, + "compute_time": runtime, + } + ) + all_results.append(metrics) + job_df["Method"] = method_name + job_details_records.append(job_df) + runtime_records.append( + { + "Scenario": scenario, + "Method": method_name, + "Replication": replication, + "compute_time": runtime, + } + ) + + results_df = pd.DataFrame(all_results) + job_details_df = pd.concat(job_details_records, ignore_index=True) + runtime_df = pd.DataFrame(runtime_records) + + summary_df = self.stats.summarize(results_df) + friedman = self.stats.friedman_test(results_df) + wilcoxon = self.stats.wilcoxon_tests(results_df) + effects_df = self.stats.effect_sizes(results_df) + + # Save tables + summary_df.to_csv(self.config.output_dir / "tables" / "summary_statistics.csv", index=False) + results_df.to_csv(self.config.output_dir / "tables" / "all_results.csv", index=False) + job_details_df.to_csv(self.config.output_dir / "tables" / "job_details.csv", index=False) + runtime_df.groupby(["Scenario", "Method"]).agg({"compute_time": "mean"}).reset_index().to_csv( + self.config.output_dir / "tables" / "runtime_summary.csv", index=False + ) + + self.stats.export_latex_table(summary_df, self.config.output_dir / "latex" / "summary_table.tex") + self.stats.export_json(friedman, self.config.output_dir / "statistics" / "friedman.json") + self.stats.export_json(wilcoxon, self.config.output_dir / "statistics" / "wilcoxon.json") + effects_df.to_csv(self.config.output_dir / "tables" / "effect_sizes.csv", index=False) + + # Visualizations + self.visuals.performance_bar(summary_df) + self.visuals.boxplots(results_df) + self.visuals.radar_chart(summary_df) + self.visuals.effect_size_heatmap(effects_df) + self.visuals.confidence_interval_plot(summary_df) + perf_with_time = summary_df.merge( + runtime_df.groupby(["Scenario", "Method"]).mean().reset_index(), + on=["Scenario", "Method"], + how="left", + ) + self.visuals.computation_time(perf_with_time) + self.visuals.correlation_matrix(results_df) + self.visuals.tardiness_distribution(results_df) + self.visuals.status_distribution(job_details_df) + self.visuals.statistical_significance_heatmap(wilcoxon) + + self.visuals.pareto_fronts(results_df) + + # Report + self.reporter.generate(summary_df, friedman, wilcoxon, effects_df) + + LOGGER.info("Experiment completed. Results saved to %s", self.config.output_dir) + + +# ========================================================================== +# CLI ENTRY POINT +# ========================================================================== + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Advanced manufacturing optimization experiments", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--methods", nargs="*", help="Subset of methods to run") + parser.add_argument("--max-jobs", type=int, default=None, help="Limit number of jobs for quick runs") + parser.add_argument("--replications", type=int, default=None, help="Override number of replications") + parser.add_argument("--scenarios", nargs="*", help="Override scenarios to evaluate") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + config = ExperimentalConfig() + if args.replications: + config.n_replications = args.replications + if args.scenarios: + config.scenarios = tuple(args.scenarios) + framework = ExperimentalFramework(config) + framework.run(selected_methods=args.methods, max_jobs=args.max_jobs) + + +if __name__ == "__main__": + main() diff --git a/algorithms/__init__.py b/algorithms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/algorithms/classical/constructive_heuristics.py b/algorithms/classical/constructive_heuristics.py new file mode 100644 index 000000000..e6919fad7 --- /dev/null +++ b/algorithms/classical/constructive_heuristics.py @@ -0,0 +1,12 @@ +"""Constructive heuristic stubs for early experimentation.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class NEHHeuristic(BaseOptimizer): + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + schedule = problem.jobs.sort_values("Processing_Time") + return ScheduleSolution(schedule=schedule, metrics={"status": "heuristic_stub"}) diff --git a/algorithms/classical/dispatching_rules.py b/algorithms/classical/dispatching_rules.py new file mode 100644 index 000000000..21e2d6b6b --- /dev/null +++ b/algorithms/classical/dispatching_rules.py @@ -0,0 +1,52 @@ +"""Implementation of classical dispatching rules.""" +from __future__ import annotations + +import pandas as pd + +from core.base_optimizer import BaseOptimizer +from core.metrics import evaluate_schedule +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class DispatchingRule(BaseOptimizer): + """Base class for dispatching rules.""" + + rule_name: str = "dispatching_rule" + + def _priority(self, jobs: pd.DataFrame) -> pd.Series: + raise NotImplementedError + + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + data = problem.jobs.copy() + data["priority"] = self._priority(data) + schedule = data.sort_values("priority").drop(columns=["priority"]) + return ScheduleSolution(schedule=schedule, metrics=evaluate_schedule(schedule)) + + +class FCFSRule(DispatchingRule): + rule_name = "fcfs" + + def _priority(self, jobs: pd.DataFrame) -> pd.Series: + return pd.to_datetime(jobs["Scheduled_Start"]).rank(method="first") + + +class SPTRule(DispatchingRule): + rule_name = "spt" + + def _priority(self, jobs: pd.DataFrame) -> pd.Series: + return jobs["Processing_Time"].rank(method="first") + + +class LPTRule(DispatchingRule): + rule_name = "lpt" + + def _priority(self, jobs: pd.DataFrame) -> pd.Series: + return -jobs["Processing_Time"].rank(method="first") + + +DISPATCHING_RULES = { + "fcfs": FCFSRule, + "spt": SPTRule, + "lpt": LPTRule, +} diff --git a/algorithms/classical/exact_methods.py b/algorithms/classical/exact_methods.py new file mode 100644 index 000000000..df768260c --- /dev/null +++ b/algorithms/classical/exact_methods.py @@ -0,0 +1,13 @@ +"""Placeholder exact optimisation methods.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class BranchAndBound(BaseOptimizer): + """Stub implementation returning the baseline schedule.""" + + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + return ScheduleSolution(schedule=problem.jobs, metrics={"status": "not_implemented"}) diff --git a/algorithms/deep_rl/dqn.py b/algorithms/deep_rl/dqn.py new file mode 100644 index 000000000..279b77093 --- /dev/null +++ b/algorithms/deep_rl/dqn.py @@ -0,0 +1,11 @@ +"""Placeholder deep reinforcement learning scheduler.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class DQNOptimizer(BaseOptimizer): + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + return ScheduleSolution(schedule=problem.jobs, metrics={"status": "dqn_stub"}) diff --git a/algorithms/hybrid/adaptive_hybrid.py b/algorithms/hybrid/adaptive_hybrid.py new file mode 100644 index 000000000..4656136d8 --- /dev/null +++ b/algorithms/hybrid/adaptive_hybrid.py @@ -0,0 +1,11 @@ +"""Hybrid optimisation placeholder.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class AdaptiveHybridOptimizer(BaseOptimizer): + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + return ScheduleSolution(schedule=problem.jobs, metrics={"status": "hybrid_stub"}) diff --git a/algorithms/metaheuristics/simulated_annealing.py b/algorithms/metaheuristics/simulated_annealing.py new file mode 100644 index 000000000..c547dba7a --- /dev/null +++ b/algorithms/metaheuristics/simulated_annealing.py @@ -0,0 +1,11 @@ +"""Simplified simulated annealing skeleton.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class SimulatedAnnealing(BaseOptimizer): + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + return ScheduleSolution(schedule=problem.jobs.sample(frac=1.0, random_state=42), metrics={"status": "annealing_stub"}) diff --git a/algorithms/multi_objective/nsga2.py b/algorithms/multi_objective/nsga2.py new file mode 100644 index 000000000..6446b2103 --- /dev/null +++ b/algorithms/multi_objective/nsga2.py @@ -0,0 +1,11 @@ +"""Skeleton implementation of NSGA-II placeholder.""" +from __future__ import annotations + +from core.base_optimizer import BaseOptimizer +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class NSGAII(BaseOptimizer): + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + return ScheduleSolution(schedule=problem.jobs, metrics={"status": "nsga2_stub"}) diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/config/base_config.py b/config/base_config.py new file mode 100644 index 000000000..89dd8aa20 --- /dev/null +++ b/config/base_config.py @@ -0,0 +1,125 @@ +"""Configuration models for the RMS optimization framework. + +This module centralises all experiment configuration objects. The +models are implemented with `pydantic` to guarantee validation and +provide convenient serialisation / deserialisation helpers. Each +configuration block mirrors one portion of the research plan described +in the project charter. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field, validator +import yaml + + +class DataConfig(BaseModel): + """Configuration for the dataset layer.""" + + sources: List[Path] = Field(default_factory=list, description="Input datasets") + streaming: bool = Field(False, description="Enable streaming data ingestion") + batch_size: int = Field(1024, ge=1, description="Batch size for streaming pipelines") + cache_dir: Path = Field(Path("data/cache")) + + +class AlgorithmConfig(BaseModel): + """Per-algorithm hyper-parameters and search spaces.""" + + name: str = Field(..., description="Primary algorithm identifier") + hyperparameters: Dict[str, Any] = Field(default_factory=dict) + search_space: Dict[str, Any] = Field(default_factory=dict) + seed: int = Field(42, description="Random seed for reproducibility") + + +class OptimizationConfig(BaseModel): + """Multi-objective optimisation settings.""" + + objectives: List[str] = Field(default_factory=lambda: ["makespan", "energy"]) + weights: Dict[str, float] = Field(default_factory=lambda: {"makespan": 0.5, "energy": 0.5}) + constraints: Dict[str, Any] = Field(default_factory=dict) + pareto_front_size: int = Field(100, ge=1) + + @validator("weights") + def validate_weights(cls, value: Dict[str, float]) -> Dict[str, float]: + if not value: + raise ValueError("At least one weight must be provided") + total = sum(value.values()) + if total <= 0: + raise ValueError("Weights must sum to a positive value") + return value + + +class SimulationConfig(BaseModel): + """Configuration of stochastic simulation parameters.""" + + repetitions: int = Field(100, ge=1) + enable_discrete_event: bool = Field(True) + enable_monte_carlo: bool = Field(True) + parallelism: int = Field(1, ge=1, description="Number of parallel workers") + + +class ValidationConfig(BaseModel): + """Statistical validation parameters.""" + + confidence_level: float = Field(0.95, ge=0.0, le=0.999) + tests: List[str] = Field(default_factory=lambda: ["friedman", "wilcoxon"]) + replications: int = Field(30, ge=1) + + +class HardwareConfig(BaseModel): + """Hardware and runtime resources.""" + + use_gpu: bool = Field(False) + num_cpus: int = Field(4, ge=1) + memory_gb: int = Field(16, ge=1) + + +class LoggingConfig(BaseModel): + """Experiment tracking and logging configuration.""" + + experiment_name: str = Field("rms-optimization") + tracking_uri: Optional[str] = Field(None, description="MLflow or W&B tracking URI") + log_dir: Path = Field(Path("logs")) + level: str = Field("INFO") + + +class ExperimentalConfig(BaseModel): + """Master configuration object that aggregates all sections.""" + + data: DataConfig = Field(default_factory=DataConfig) + algorithm: AlgorithmConfig = Field(default_factory=lambda: AlgorithmConfig(name="fcfs")) + optimisation: OptimizationConfig = Field(default_factory=OptimizationConfig) + simulation: SimulationConfig = Field(default_factory=SimulationConfig) + validation: ValidationConfig = Field(default_factory=ValidationConfig) + hardware: HardwareConfig = Field(default_factory=HardwareConfig) + logging: LoggingConfig = Field(default_factory=LoggingConfig) + + @classmethod + def from_file(cls, path: Path) -> "ExperimentalConfig": + """Load configuration from a YAML or JSON file.""" + + with Path(path).open("r", encoding="utf-8") as handle: + data = yaml.safe_load(handle) + return cls.parse_obj(data) + + def to_dict(self) -> Dict[str, Any]: + """Serialise configuration to a dictionary.""" + + return self.dict() + + def save(self, path: Path) -> None: + """Persist configuration to disk.""" + + with Path(path).open("w", encoding="utf-8") as handle: + yaml.safe_dump(self.to_dict(), handle) + + +def load_config(path: Optional[Path] = None, overrides: Optional[Dict[str, Any]] = None) -> ExperimentalConfig: + """Utility wrapper to load and override configuration fields.""" + + config = ExperimentalConfig.from_file(path) if path else ExperimentalConfig() + if overrides: + config = config.copy(update=overrides) + return config diff --git a/config/base_config.yaml b/config/base_config.yaml new file mode 100644 index 000000000..4566ac03d --- /dev/null +++ b/config/base_config.yaml @@ -0,0 +1,19 @@ +data: + sources: [] +algorithm: + name: fcfs +optimisation: + objectives: + - makespan + - energy + weights: + makespan: 0.5 + energy: 0.5 +simulation: + repetitions: 10 +validation: + confidence_level: 0.95 +hardware: + use_gpu: false +logging: + experiment_name: rms-baseline diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/core/base_optimizer.py b/core/base_optimizer.py new file mode 100644 index 000000000..fd5d5c760 --- /dev/null +++ b/core/base_optimizer.py @@ -0,0 +1,24 @@ +"""Abstract base classes for optimisation algorithms.""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Dict + +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +class BaseOptimizer(ABC): + """Base class every optimisation algorithm should derive from.""" + + def __init__(self, **hyperparameters: Any) -> None: + self.hyperparameters = hyperparameters + + @abstractmethod + def solve(self, problem: ManufacturingProblem) -> ScheduleSolution: + """Compute a solution for the provided manufacturing problem.""" + + def info(self) -> Dict[str, Any]: + """Return metadata describing the optimizer.""" + + return {"name": self.__class__.__name__, "hyperparameters": self.hyperparameters} diff --git a/core/config.py b/core/config.py new file mode 100644 index 000000000..fae8463e2 --- /dev/null +++ b/core/config.py @@ -0,0 +1,25 @@ +"""Helper functions to work with experiment configuration.""" +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, Optional + +from config.base_config import ExperimentalConfig, load_config + + +class ConfigManager: + """High level API to manage experiment configuration.""" + + def __init__(self, config: Optional[ExperimentalConfig] = None) -> None: + self._config = config or ExperimentalConfig() + + @property + def config(self) -> ExperimentalConfig: + return self._config + + @classmethod + def from_file(cls, path: Path) -> "ConfigManager": + return cls(load_config(path)) + + def override(self, updates: Dict[str, Any]) -> None: + self._config = self._config.copy(update=updates) diff --git a/core/metrics.py b/core/metrics.py new file mode 100644 index 000000000..8db2ee8e0 --- /dev/null +++ b/core/metrics.py @@ -0,0 +1,25 @@ +"""Core metrics for manufacturing optimisation.""" +from __future__ import annotations + +from typing import Dict + +import pandas as pd + + +def compute_makespan(schedule: pd.DataFrame) -> float: + if schedule.empty: + return 0.0 + end_times = pd.to_datetime(schedule["Scheduled_End"]) + start_times = pd.to_datetime(schedule["Scheduled_Start"]) + return float((end_times.max() - start_times.min()).total_seconds() / 60.0) + + +def compute_energy(schedule: pd.DataFrame) -> float: + return float(schedule.get("Energy_Consumption", pd.Series(dtype=float)).sum()) + + +def evaluate_schedule(schedule: pd.DataFrame) -> Dict[str, float]: + return { + "makespan": compute_makespan(schedule), + "energy": compute_energy(schedule), + } diff --git a/core/problem.py b/core/problem.py new file mode 100644 index 000000000..016522d8c --- /dev/null +++ b/core/problem.py @@ -0,0 +1,23 @@ +"""Problem representations for RMS optimisation.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +import pandas as pd + + +@dataclass +class ManufacturingProblem: + """Encapsulate the data describing a scheduling instance.""" + + jobs: pd.DataFrame + objectives: List[str] + constraints: Dict[str, float] = field(default_factory=dict) + metadata: Optional[Dict[str, str]] = None + + def __post_init__(self) -> None: + if not isinstance(self.jobs, pd.DataFrame): + raise TypeError("jobs must be provided as a pandas DataFrame") + if not self.objectives: + raise ValueError("At least one objective must be specified") diff --git a/core/solution.py b/core/solution.py new file mode 100644 index 000000000..f3c5a7a2e --- /dev/null +++ b/core/solution.py @@ -0,0 +1,19 @@ +"""Solution representation for RMS optimisation problems.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, Optional + +import pandas as pd + + +@dataclass +class ScheduleSolution: + """Container for schedules generated by optimisation algorithms.""" + + schedule: pd.DataFrame + metrics: Dict[str, float] = field(default_factory=dict) + metadata: Optional[Dict[str, str]] = None + + def to_dict(self) -> Dict[str, float]: + return self.metrics.copy() diff --git a/data/__init__.py b/data/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/data/cache.py b/data/cache.py new file mode 100644 index 000000000..e010111f9 --- /dev/null +++ b/data/cache.py @@ -0,0 +1,24 @@ +"""Simple caching utilities for large datasets.""" +from __future__ import annotations + +from pathlib import Path +from typing import Callable, Optional + +import joblib +import pandas as pd + + +class DataCache: + """Persist dataframes using joblib for quick reloads.""" + + def __init__(self, cache_dir: Path) -> None: + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + + def load_or_compute(self, name: str, factory: Callable[[], pd.DataFrame]) -> pd.DataFrame: + path = self.cache_dir / f"{name}.pkl" + if path.exists(): + return joblib.load(path) + dataframe = factory() + joblib.dump(dataframe, path) + return dataframe diff --git a/data/generator.py b/data/generator.py new file mode 100644 index 000000000..d21ca229a --- /dev/null +++ b/data/generator.py @@ -0,0 +1,49 @@ +"""Synthetic data generation utilities.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Iterable, List, Sequence +import numpy as np +import pandas as pd + + +@dataclass +class SyntheticScenario: + """Scenario configuration for synthetic dataset creation.""" + + num_jobs: int + machines: Sequence[str] + start_date: datetime + time_between_jobs: timedelta + + +class SyntheticDataGenerator: + """Generate synthetic manufacturing datasets.""" + + def generate(self, scenario: SyntheticScenario) -> pd.DataFrame: + rng = np.random.default_rng() + timestamps = [ + scenario.start_date + i * scenario.time_between_jobs for i in range(scenario.num_jobs) + ] + machines = rng.choice(scenario.machines, size=scenario.num_jobs) + processing_time = rng.integers(10, 240, size=scenario.num_jobs) + energy = rng.normal(15, 5, size=scenario.num_jobs).clip(min=1) + data = pd.DataFrame( + { + "Job_ID": [f"JOB_{i:05d}" for i in range(scenario.num_jobs)], + "Machine_ID": machines, + "Scheduled_Start": timestamps, + "Scheduled_End": [ts + timedelta(minutes=int(pt)) for ts, pt in zip(timestamps, processing_time)], + "Processing_Time": processing_time, + "Energy_Consumption": energy, + } + ) + return data + + +class BenchmarkDataGenerator: + """Placeholder for benchmark dataset retrieval.""" + + def load_instances(self, names: Iterable[str]) -> List[pd.DataFrame]: + return [pd.DataFrame({"instance": [name]}) for name in names] diff --git a/data/loader.py b/data/loader.py new file mode 100644 index 000000000..20eab346e --- /dev/null +++ b/data/loader.py @@ -0,0 +1,75 @@ +"""Data ingestion utilities for the RMS optimisation framework.""" +from __future__ import annotations + +from pathlib import Path +from typing import Iterable, List, Optional + +import pandas as pd +from pydantic import BaseModel, ValidationError + + +class DataSchema(BaseModel): + """Minimal schema used to validate ingested datasets.""" + + Job_ID: str + Machine_ID: str + Scheduled_Start: str + Scheduled_End: str + + +class DataValidator: + """Validate raw data sources using `pydantic` models.""" + + def __init__(self, schema: type[BaseModel] = DataSchema) -> None: + self.schema = schema + + def validate(self, dataframe: pd.DataFrame) -> pd.DataFrame: + errors: List[str] = [] + for row in dataframe.to_dict(orient="records"): + try: + self.schema(**row) + except ValidationError as exc: + errors.append(str(exc)) + if errors: + raise ValueError("Invalid dataset detected:\n" + "\n".join(errors[:5])) + return dataframe + + +class DataLoader: + """Load multiple dataset formats into pandas DataFrames.""" + + def __init__(self, validator: Optional[DataValidator] = None) -> None: + self.validator = validator or DataValidator() + + def load(self, sources: Iterable[Path], validate: bool = True) -> pd.DataFrame: + frames: List[pd.DataFrame] = [] + for source in sources: + frame = self._load_single(source) + frames.append(frame) + data = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() + return self.validator.validate(data) if validate and not data.empty else data + + def _load_single(self, path: Path) -> pd.DataFrame: + suffix = Path(path).suffix.lower() + if suffix == ".csv": + return pd.read_csv(path) + if suffix in {".parquet", ".pq"}: + return pd.read_parquet(path) + if suffix in {".json"}: + return pd.read_json(path) + raise ValueError(f"Unsupported file format: {suffix}") + + +class DataPreprocessor: + """Simple preprocessing utilities for baseline experiments.""" + + datetime_columns: List[str] = ["Scheduled_Start", "Scheduled_End"] + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + df = dataframe.copy() + for column in self.datetime_columns: + if column in df: + df[column] = pd.to_datetime(df[column]) + df = df.drop_duplicates() + df = df.fillna(method="ffill").fillna(method="bfill") + return df diff --git a/data/synthetic/sample.csv b/data/synthetic/sample.csv new file mode 100644 index 000000000..699bf0e4b --- /dev/null +++ b/data/synthetic/sample.csv @@ -0,0 +1,4 @@ +Job_ID,Machine_ID,Scheduled_Start,Scheduled_End,Processing_Time,Energy_Consumption +JOB_00001,M01,2023-01-01T08:00:00,2023-01-01T09:00:00,60,12.5 +JOB_00002,M02,2023-01-01T08:15:00,2023-01-01T09:05:00,50,11.0 +JOB_00003,M01,2023-01-01T09:10:00,2023-01-01T10:00:00,50,10.2 diff --git a/experiments/__init__.py b/experiments/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/experiments/manager.py b/experiments/manager.py new file mode 100644 index 000000000..d83ef4f9b --- /dev/null +++ b/experiments/manager.py @@ -0,0 +1,43 @@ +"""Experiment orchestration utilities.""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Dict, List + +import pandas as pd + +from algorithms.classical.dispatching_rules import DISPATCHING_RULES +from config.base_config import ExperimentalConfig +from core.metrics import evaluate_schedule +from core.problem import ManufacturingProblem +from core.solution import ScheduleSolution + + +@dataclass +class ExperimentResult: + algorithm: str + metrics: Dict[str, float] + + +class ExperimentManager: + """Coordinate data loading, algorithm execution, and metric logging.""" + + def __init__(self, config: ExperimentalConfig) -> None: + self.config = config + + def run(self, problem: ManufacturingProblem) -> List[ExperimentResult]: + results: List[ExperimentResult] = [] + for name, cls in DISPATCHING_RULES.items(): + optimizer = cls() + solution = optimizer.solve(problem) + results.append(ExperimentResult(algorithm=name, metrics=solution.metrics)) + return results + + def summarise(self, results: List[ExperimentResult]) -> pd.DataFrame: + return pd.DataFrame([{"algorithm": r.algorithm, **r.metrics} for r in results]) + + +def export_results(results: pd.DataFrame, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + results.to_csv(path, index=False) diff --git a/problems/__init__.py b/problems/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/problems/job_shop.py b/problems/job_shop.py new file mode 100644 index 000000000..c72d9207b --- /dev/null +++ b/problems/job_shop.py @@ -0,0 +1,12 @@ +"""Job shop problem factory.""" +from __future__ import annotations + +import pandas as pd + +from core.problem import ManufacturingProblem + + +def create_job_shop_problem(data: pd.DataFrame) -> ManufacturingProblem: + objectives = ["makespan", "energy"] + constraints = {"machine_capacity": 1.0} + return ManufacturingProblem(jobs=data, objectives=objectives, constraints=constraints) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..b4a211427 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "rms-optimization-framework" +version = "0.1.0" +description = "Modular framework for reconfigurable manufacturing systems optimisation" +authors = [{name = "Research Automation"}] +requires-python = ">=3.10" +dependencies = [ + "pandas", + "numpy", + "pydantic", + "PyYAML", + "matplotlib", + "scipy", + "joblib", +] + +[project.optional-dependencies] +visualization = ["seaborn"] + +[tool.black] +line-length = 88 diff --git a/reporting/__init__.py b/reporting/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/reporting/generators.py b/reporting/generators.py new file mode 100644 index 000000000..e0eea5498 --- /dev/null +++ b/reporting/generators.py @@ -0,0 +1,22 @@ +"""Automated reporting utilities.""" +from __future__ import annotations + +from pathlib import Path +from typing import Dict + +import pandas as pd + + +class MarkdownReporter: + def __init__(self, output_path: Path) -> None: + self.output_path = output_path + + def render(self, metrics: Dict[str, float], table: pd.DataFrame) -> Path: + lines = ["# Experiment Summary", "", "## Aggregate Metrics"] + for key, value in metrics.items(): + lines.append(f"- **{key}**: {value:.3f}") + lines.append("\n## Detailed Results") + lines.append(table.to_markdown(index=False)) + self.output_path.parent.mkdir(parents=True, exist_ok=True) + self.output_path.write_text("\n".join(lines), encoding="utf-8") + return self.output_path diff --git a/scripts/run_experiments.py b/scripts/run_experiments.py new file mode 100644 index 000000000..d12a590e9 --- /dev/null +++ b/scripts/run_experiments.py @@ -0,0 +1,52 @@ +"""Entry point to execute baseline experiments.""" +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from config.base_config import load_config +from core.config import ConfigManager +from data.loader import DataLoader, DataPreprocessor +from experiments.manager import ExperimentManager, export_results +from problems.job_shop import create_job_shop_problem +from reporting.generators import MarkdownReporter +from visualization.plots import bar_performance + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run RMS optimisation experiments") + parser.add_argument("--config", type=Path, help="Path to configuration file", required=False) + parser.add_argument("--output", type=Path, default=Path("results/experiments/baseline.csv")) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + config = load_config(args.config) if args.config else load_config() + manager = ConfigManager(config) + + loader = DataLoader() + preprocessor = DataPreprocessor() + data_sources = manager.config.data.sources or [Path("data/synthetic/sample.csv")] + frames = [loader._load_single(source) for source in data_sources if Path(source).exists()] + if frames: + data = preprocessor.transform(pd.concat(frames, ignore_index=True)) + else: + data = pd.DataFrame() + problem = create_job_shop_problem(data) + + experiment_manager = ExperimentManager(manager.config) + results = experiment_manager.run(problem) + summary = experiment_manager.summarise(results) + export_results(summary, args.output) + + if not summary.empty: + bar_performance(summary, "makespan", Path("results/figures/makespan.png")) + reporter = MarkdownReporter(Path("results/reports/summary.md")) + reporter.render({"runs": len(summary)}, summary) + + +if __name__ == "__main__": + main() diff --git a/simulation/__init__.py b/simulation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/simulation/discrete_event.py b/simulation/discrete_event.py new file mode 100644 index 000000000..a8f0814d3 --- /dev/null +++ b/simulation/discrete_event.py @@ -0,0 +1,26 @@ +"""Simplified discrete event simulation skeleton.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List + + +@dataclass(order=True) +class Event: + time: float + description: str + + +@dataclass +class DiscreteEventSimulator: + events: List[Event] = field(default_factory=list) + + def schedule(self, event: Event) -> None: + self.events.append(event) + self.events.sort() + + def run(self) -> List[Event]: + executed: List[Event] = [] + while self.events: + executed.append(self.events.pop(0)) + return executed diff --git a/simulation/monte_carlo.py b/simulation/monte_carlo.py new file mode 100644 index 000000000..e26921d4b --- /dev/null +++ b/simulation/monte_carlo.py @@ -0,0 +1,15 @@ +"""Monte Carlo simulation helper.""" +from __future__ import annotations + +from typing import Callable + +import numpy as np + + +class MonteCarloEngine: + def __init__(self, repetitions: int) -> None: + self.repetitions = repetitions + + def estimate(self, func: Callable[[], float]) -> float: + samples = np.array([func() for _ in range(self.repetitions)]) + return float(samples.mean()) diff --git a/simulation/stochastic_models.py b/simulation/stochastic_models.py new file mode 100644 index 000000000..ae0416768 --- /dev/null +++ b/simulation/stochastic_models.py @@ -0,0 +1,15 @@ +"""Stochastic models for manufacturing processes.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable + +import numpy as np + + +@dataclass +class ProcessingTimeModel: + distribution: Callable[[int], np.ndarray] + + def sample(self, size: int) -> np.ndarray: + return self.distribution(size) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/test_dispatching.py b/tests/unit/test_dispatching.py new file mode 100644 index 000000000..c14b25c20 --- /dev/null +++ b/tests/unit/test_dispatching.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import pandas as pd + +from algorithms.classical.dispatching_rules import FCFSRule +from problems.job_shop import create_job_shop_problem + + +def test_fcfs_returns_sorted_schedule(): + data = pd.DataFrame( + { + "Job_ID": ["A", "B"], + "Machine_ID": ["M1", "M1"], + "Scheduled_Start": ["2023-01-01T09:00:00", "2023-01-01T08:00:00"], + "Scheduled_End": ["2023-01-01T10:00:00", "2023-01-01T09:00:00"], + "Processing_Time": [60, 60], + } + ) + problem = create_job_shop_problem(data) + optimizer = FCFSRule() + solution = optimizer.solve(problem) + assert list(solution.schedule["Job_ID"]) == ["B", "A"] diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/utils/logging.py b/utils/logging.py new file mode 100644 index 000000000..4cce01fa0 --- /dev/null +++ b/utils/logging.py @@ -0,0 +1,17 @@ +"""Logging utilities for the framework.""" +from __future__ import annotations + +import logging +from pathlib import Path + + +def configure_logging(log_dir: Path, level: str = "INFO") -> None: + log_dir.mkdir(parents=True, exist_ok=True) + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler(log_dir / "framework.log"), + logging.StreamHandler(), + ], + ) diff --git a/validation/__init__.py b/validation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/validation/empirical.py b/validation/empirical.py new file mode 100644 index 000000000..dbdbb640b --- /dev/null +++ b/validation/empirical.py @@ -0,0 +1,21 @@ +"""Empirical validation utilities.""" +from __future__ import annotations + +from typing import Dict + +import numpy as np +import pandas as pd +from scipy import stats + + +def friedman_test(results: pd.DataFrame) -> Dict[str, float]: + pivot = results.pivot(index="replication", columns="algorithm", values="makespan") + statistic, pvalue = stats.friedmanchisquare(*pivot.T.values) + return {"statistic": float(statistic), "p_value": float(pvalue)} + + +def confidence_interval(values: np.ndarray, level: float = 0.95) -> Dict[str, float]: + mean = float(np.mean(values)) + sem = stats.sem(values) + interval = stats.t.interval(level, len(values) - 1, loc=mean, scale=sem) + return {"mean": mean, "lower": float(interval[0]), "upper": float(interval[1])} diff --git a/validation/theoretical.py b/validation/theoretical.py new file mode 100644 index 000000000..501ac8c1c --- /dev/null +++ b/validation/theoretical.py @@ -0,0 +1,20 @@ +"""Theoretical validation helpers.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict + + +@dataclass +class ComplexityAnalysis: + algorithm: str + time_complexity: str + space_complexity: str + + +def document_complexity(algorithm: str, time_complexity: str, space_complexity: str) -> Dict[str, str]: + return { + "algorithm": algorithm, + "time_complexity": time_complexity, + "space_complexity": space_complexity, + } diff --git a/visualization/__init__.py b/visualization/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/visualization/plots.py b/visualization/plots.py new file mode 100644 index 000000000..298e8b665 --- /dev/null +++ b/visualization/plots.py @@ -0,0 +1,22 @@ +"""Plotting utilities for experiments.""" +from __future__ import annotations + +from pathlib import Path +from typing import Iterable + +import matplotlib.pyplot as plt +import pandas as pd + + +def bar_performance(results: pd.DataFrame, metric: str, output: Path) -> Path: + fig, ax = plt.subplots(figsize=(6, 4)) + ax.bar(results["algorithm"], results[metric]) + ax.set_ylabel(metric) + ax.set_xlabel("Algorithm") + ax.set_title(f"Performance comparison on {metric}") + ax.grid(True, axis="y", alpha=0.3) + output.parent.mkdir(parents=True, exist_ok=True) + fig.tight_layout() + fig.savefig(output, dpi=300) + plt.close(fig) + return output