Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 61 additions & 114 deletions py_stats_toolkit/stats/correlation/CorrelationModule.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"""
=====================================================================
File : CorrelationModule.py
=====================================================================
version : 1.0.0
version : 2.0.0
release : 15/06/2025
author : Phoenix Project
contact : contact@phonxproject.onmicrosoft.fr
Expand All @@ -11,155 +11,102 @@
Copyright (c) 2025, Phoenix Project
All rights reserved.

Description du module CorrelationModule.py
Refactored module for correlation analysis.
Follows SOLID principles with separation of business logic and algorithms.

tags : module, stats
tags : module, stats, refactored
=====================================================================
Ce module Description du module CorrelationModule.py
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


tags : module, stats
=====================================================================
'''

# Imports spécifiques au module
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from typing import Any, Dict, Union
import pandas as pd

# Imports de la base
from capsules.BaseCapsule import BaseCapsule
# Import base class and utilities
from py_stats_toolkit.core.base import StatisticalModule
from py_stats_toolkit.core.validators import DataValidator
from py_stats_toolkit.utils.parallel import ParallelProcessor, get_optimal_chunk_size
from py_stats_toolkit.algorithms import correlation as correlation_algos

class CorrelationModule(BaseCapsule):
"""
Classe CorrelationModule

Attributes:
data, parameters, results

class CorrelationModule(StatisticalModule):
"""
Module for correlation analysis (Business Logic Layer).

def __init__(self):
"""
Initialise CorrelationModule.
"""
super().__init__()
pass
Responsibilities:
- Orchestrate correlation analysis workflow
- Manage results and state
- Provide user-facing API

def configure(self, **kwargs) -> None:
"""
Configure les paramètres de CorrelationModule.

Args:
**kwargs: Paramètres de configuration
"""
pass
Delegates to:
- DataValidator for validation
- correlation_algos for computations
- ParallelProcessor for parallel processing
"""

def process(self, data: Union[pd.DataFrame, pd.Series], **kwargs) -> Dict[str, Any]:
def __init__(self, n_jobs: int = -1):
"""
Exécute le flux de travail d'analyse.
Initialize correlation module.

Args:
data (Union[pd.DataFrame, pd.Series]): Données à analyser
**kwargs: Arguments additionnels

Returns:
Dict[str, Any]: Résultats de l'analyse
n_jobs: Number of parallel jobs (-1 for all CPUs)
"""
pass

import numpy as np
import pandas as pd
from scipy import stats
from ..core.AbstractClassBase import StatisticalModule
from ...utils.parallel import ParallelProcessor, get_optimal_chunk_size

class CorrelationModule(StatisticalModule):
"""Module pour l'analyse de corrélation."""

def __init__(self, n_jobs: int = -1):
super().__init__()
self.method = None
self.parallel_processor = ParallelProcessor(n_jobs=n_jobs)

def _compute_correlation_chunk(self, chunk_data):
"""Calcule la corrélation pour un chunk de données."""
return chunk_data.corr(method=self.method)

def process(self, data, method="pearson", **kwargs):
def process(self, data: Union[pd.DataFrame, pd.Series], method: str = "pearson",
**kwargs) -> pd.DataFrame:
"""
Calcule la corrélation entre les variables en parallèle.
Compute correlation between variables.

Args:
data: Données d'entrée (pandas DataFrame)
method: Méthode de corrélation ('pearson', 'spearman', 'kendall')
**kwargs: Arguments additionnels
data: Input DataFrame
method: Correlation method ('pearson', 'spearman', 'kendall')
**kwargs: Additional arguments

Returns:
Matrice de corrélation
Correlation matrix
"""
self.validate_data(data)
self.method = method
# Validation (delegated to validator)
DataValidator.validate_data(data)

if not isinstance(data, pd.DataFrame):
raise TypeError("Les données doivent être un pandas DataFrame")
raise TypeError("Data must be a pandas DataFrame")

# Pour les petits DataFrames, calcul direct
if len(data.columns) < 100:
self.result = data.corr(method=method)
return self.result
DataValidator.validate_numeric(data)

# Pour les grands DataFrames, traitement parallèle
n_cols = len(data.columns)
chunk_size = get_optimal_chunk_size(n_cols, self.parallel_processor.n_jobs)

# Division des colonnes en chunks
chunks = []
for i in range(0, n_cols, chunk_size):
chunk_cols = data.columns[i:min(i + chunk_size, n_cols)]
chunks.append(data[chunk_cols])
# Store state
self.data = data
self.method = method

# Calcul parallèle des corrélations
chunk_results = self.parallel_processor.parallel_map(
self._compute_correlation_chunk,
chunks
)
# Computation (delegated to algorithm layer)
self.result = correlation_algos.compute_correlation_matrix(data, method)

# Assemblage des résultats
self.result = pd.concat(chunk_results, axis=1)
return self.result

def get_correlation_matrix(self):
"""Retourne la matrice de corrélation."""
return self.result
def get_correlation_matrix(self) -> pd.DataFrame:
"""
Get the correlation matrix.

Returns:
Correlation matrix
"""
return self.get_result()

def get_correlation_pairs(self, threshold=0.5):
def get_correlation_pairs(self, threshold: float = 0.5):
"""
Retourne les paires de variables avec une corrélation supérieure au seuil.
Get variable pairs with correlation above threshold.

Args:
threshold: Seuil de corrélation
threshold: Minimum absolute correlation value

Returns:
Liste de tuples (var1, var2, corr)
List of (var1, var2, correlation) tuples
"""
if self.result is None:
raise ValueError("Exécutez d'abord process()")

# Utilisation de numpy pour le calcul parallèle des paires
corr_matrix = self.result.values
n = len(self.result.columns)

# Création des indices pour les paires
i, j = np.triu_indices(n, k=1)
corr_values = corr_matrix[i, j]

# Filtrage des paires selon le seuil
mask = np.abs(corr_values) >= threshold
pairs = []

for idx in np.where(mask)[0]:
var1 = self.result.columns[i[idx]]
var2 = self.result.columns[j[idx]]
corr = corr_values[idx]
pairs.append((var1, var2, corr))
if not self.has_result():
raise ValueError("No analysis performed. Call process() first.")

return sorted(pairs, key=lambda x: abs(x[2]), reverse=True)
# Delegate computation to algorithm layer
return correlation_algos.compute_pairwise_correlations(
self.data, self.method, threshold
)
Loading