diff --git a/server/src/scripts/import-dpe/analyze_dpe_distribution.py b/server/src/scripts/import-dpe/analyze_dpe_distribution.py new file mode 100644 index 000000000..6f1486571 --- /dev/null +++ b/server/src/scripts/import-dpe/analyze_dpe_distribution.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +""" +Script d'analyse de la distribution temporelle des données DPE brutes. + +Ce script analyse la table dpe_raw pour comprendre la répartition des DPE +dans le temps, particulièrement sur les 12 derniers mois. + +Usage: + python analyze_dpe_distribution.py + +Variables d'environnement requises: + DATABASE_URL ou (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT) +""" + +import os +import sys +import logging +from datetime import datetime, timedelta +from pathlib import Path +import psycopg2 +from urllib.parse import urlparse + +def setup_logger(): + """Configure le système de logging.""" + # Créer le dossier logs s'il n'existe pas + log_dir = Path(__file__).parent / 'logs' + log_dir.mkdir(exist_ok=True) + + # Nom du fichier de log avec timestamp + log_filename = log_dir / f'analyze_dpe_distribution_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + + # Configuration du logger + logger = logging.getLogger('dpe_distribution_analyzer') + logger.setLevel(logging.INFO) + + # Clear existing handlers + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + # File handler + file_handler = logging.FileHandler(log_filename, encoding='utf-8') + file_handler.setLevel(logging.INFO) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + # Formatter + formatter = logging.Formatter('%(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger, log_filename + +def get_db_connection(): + """Établit une connexion à la base de données.""" + database_url = os.environ.get('DATABASE_URL') + + if database_url: + parsed = urlparse(database_url) + return psycopg2.connect( + host=parsed.hostname, + port=parsed.port or 5432, + database=parsed.path[1:], + user=parsed.username, + password=parsed.password + ) + else: + return psycopg2.connect( + host=os.environ.get('DB_HOST', 'localhost'), + port=os.environ.get('DB_PORT', 5432), + database=os.environ.get('DB_NAME', 'zlv'), + user=os.environ.get('DB_USER', 'postgres'), + password=os.environ.get('DB_PASSWORD', 'postgres') + ) + +def analyze_distribution(conn, logger): + """Analyse la distribution temporelle des DPE.""" + cursor = conn.cursor() + + logger.info("=" * 80) + logger.info("ANALYSE DE LA DISTRIBUTION TEMPORELLE DES DONNÉES DPE BRUTES") + logger.info("=" * 80) + logger.info("") + + # 1. Nombre total de DPE + cursor.execute("SELECT COUNT(*) FROM dpe_raw") + total = cursor.fetchone()[0] + logger.info(f"Nombre total de DPE dans dpe_raw: {total:,}") + logger.info("") + + # 2. Plage de dates + cursor.execute(""" + SELECT + MIN(date_etablissement_dpe) as min_date, + MAX(date_etablissement_dpe) as max_date + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + """) + min_date, max_date = cursor.fetchone() + logger.info(f"Période couverte: {min_date} à {max_date}") + logger.info() + + # 3. Distribution par année + logger.info("-" * 80) + logger.info("DISTRIBUTION PAR ANNÉE") + logger.info("-" * 80) + cursor.execute(""" + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + COUNT(*) as nb_dpe, + ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM dpe_raw WHERE date_etablissement_dpe IS NOT NULL), 2) as pourcentage + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + GROUP BY annee + ORDER BY annee DESC + """) + + logger.info(f"{'Année':<10} {'Nombre DPE':>15} {'Pourcentage':>12}") + logger.info("-" * 40) + for row in cursor.fetchall(): + annee, nb, pct = row + logger.info(f"{int(annee):<10} {nb:>15,} {pct:>11.2f}%") + logger.info() + + # 4. Distribution des 12 derniers mois + logger.info("-" * 80) + logger.info("DISTRIBUTION DES 12 DERNIERS MOIS") + logger.info("-" * 80) + + # Calculer la date il y a 12 mois + today = datetime.now().date() + twelve_months_ago = today - timedelta(days=365) + + cursor.execute(""" + SELECT + TO_CHAR(date_etablissement_dpe, 'YYYY-MM') as mois, + COUNT(*) as nb_dpe + FROM dpe_raw + WHERE date_etablissement_dpe >= %s + AND date_etablissement_dpe IS NOT NULL + GROUP BY mois + ORDER BY mois DESC + """, (twelve_months_ago,)) + + results_12_months = cursor.fetchall() + total_12_months = sum(r[1] for r in results_12_months) + + logger.info(f"DPE sur les 12 derniers mois: {total_12_months:,} ({total_12_months * 100.0 / total:.2f}%)") + logger.info() + logger.info(f"{'Mois':<10} {'Nombre DPE':>15}") + logger.info("-" * 30) + for row in results_12_months: + mois, nb = row + logger.info(f"{mois:<10} {nb:>15,}") + logger.info() + + # 5. Comparaison avec les années précédentes + logger.info("-" * 80) + logger.info("COMPARAISON ANNUELLE (même période de 12 mois)") + logger.info("-" * 80) + + cursor.execute(""" + WITH yearly_counts AS ( + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + COUNT(*) as nb_dpe + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + AND EXTRACT(MONTH FROM date_etablissement_dpe) BETWEEN + EXTRACT(MONTH FROM CURRENT_DATE - INTERVAL '12 months') AND + EXTRACT(MONTH FROM CURRENT_DATE) + GROUP BY annee + ORDER BY annee DESC + ) + SELECT * FROM yearly_counts LIMIT 5 + """) + + logger.info(f"{'Année':<10} {'Nombre DPE':>15}") + logger.info("-" * 30) + for row in cursor.fetchall(): + annee, nb = row + logger.info(f"{int(annee):<10} {nb:>15,}") + logger.info() + + # 6. Analyse des causes possibles + logger.info("-" * 80) + logger.info("ANALYSE DES CAUSES POSSIBLES DU MANQUE DE DONNÉES RÉCENTES") + logger.info("-" * 80) + logger.info() + + # Vérifier la date de réception vs date d'établissement + cursor.execute(""" + SELECT + COUNT(*) FILTER (WHERE date_reception_dpe IS NOT NULL) as avec_date_reception, + COUNT(*) as total, + MAX(date_reception_dpe) as derniere_reception + FROM dpe_raw + """) + avec_reception, total_check, derniere_reception = cursor.fetchone() + + logger.info(f"Date de dernière réception dans la base: {derniere_reception}") + logger.info(f"DPE avec date_reception_dpe renseignée: {avec_reception:,} / {total_check:,}") + logger.info() + + # Distribution par date de réception sur les 12 derniers mois + cursor.execute(""" + SELECT + TO_CHAR(date_reception_dpe, 'YYYY-MM') as mois, + COUNT(*) as nb_dpe + FROM dpe_raw + WHERE date_reception_dpe >= %s + AND date_reception_dpe IS NOT NULL + GROUP BY mois + ORDER BY mois DESC + LIMIT 12 + """, (twelve_months_ago,)) + + results_reception = cursor.fetchall() + if results_reception: + logger.info("Distribution par DATE DE RÉCEPTION (12 derniers mois):") + logger.info(f"{'Mois':<10} {'Nombre DPE':>15}") + logger.info("-" * 30) + for row in results_reception: + mois, nb = row + logger.info(f"{mois:<10} {nb:>15,}") + logger.info() + + # 7. Vérifier la source des données + logger.info("-" * 80) + logger.info("HYPOTHÈSES EXPLICATIVES") + logger.info("-" * 80) + logger.info() + logger.info("Le manque de données sur les 12 derniers mois peut s'expliquer par:") + logger.info() + logger.info("1. DÉLAI DE PUBLICATION DE L'ADEME") + logger.info(" - Les données DPE sont publiées par l'ADEME avec un délai") + logger.info(" - Les DPE les plus récents ne sont pas encore disponibles") + logger.info(" - Vérifier la date de téléchargement du fichier source") + logger.info() + logger.info("2. DATE DU FICHIER SOURCE") + logger.info(" - Le fichier importé date peut-être de plusieurs mois") + logger.info(" - Vérifier l'URL de téléchargement et la date du fichier") + logger.info() + logger.info("3. DÉLAI DE TRAITEMENT") + logger.info(" - Entre la visite du diagnostiqueur et la publication") + logger.info(" - Peut prendre plusieurs semaines à plusieurs mois") + logger.info() + + # 8. Statistiques sur la dernière modification + cursor.execute(""" + SELECT + MAX(date_derniere_modification_dpe) as derniere_modif + FROM dpe_raw + WHERE date_derniere_modification_dpe IS NOT NULL + """) + derniere_modif = cursor.fetchone()[0] + logger.info(f"Date de dernière modification DPE dans la base: {derniere_modif}") + logger.info() + + # 9. Résumé + logger.info("=" * 80) + logger.info("RÉSUMÉ") + logger.info("=" * 80) + logger.info() + logger.info(f"Total DPE: {total:,}") + logger.info(f"DPE des 12 derniers mois: {total_12_months:,} ({total_12_months * 100.0 / total:.2f}%)") + logger.info(f"Dernière date d'établissement: {max_date}") + logger.info(f"Dernière date de réception: {derniere_reception}") + logger.info() + + cursor.close() + +def main(): + logger, log_filename = setup_logger() + + try: + logger.info("Démarrage de l'analyse de distribution DPE") + conn = get_db_connection() + logger.info("Connexion à la base de données établie.") + logger.info("") + analyze_distribution(conn, logger) + conn.close() + logger.info("Analyse terminée avec succès") + logger.info(f"\nLog détaillé: {log_filename}") + except Exception as e: + logger.error(f"Erreur: {e}") + print(f"Erreur: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/server/src/scripts/import-dpe/analyze_dpe_prod_comparison.py b/server/src/scripts/import-dpe/analyze_dpe_prod_comparison.py new file mode 100644 index 000000000..5c66b3691 --- /dev/null +++ b/server/src/scripts/import-dpe/analyze_dpe_prod_comparison.py @@ -0,0 +1,1041 @@ +#!/usr/bin/env python3 +""" +Script de comparaison des données DPE entre la table brute (dpe_raw) et +la table de production (buildings). + +Ce script détecte les anomalies et incohérences entre les deux sources. + +Usage: + # Rapport texte (défaut) + DATABASE_URL_RAW="postgres://..." DATABASE_URL_PROD="postgres://..." python analyze_dpe_prod_comparison.py + + # Rapport PDF avec graphiques + DATABASE_URL_RAW="postgres://..." DATABASE_URL_PROD="postgres://..." python analyze_dpe_prod_comparison.py --pdf rapport.pdf + +Variables d'environnement requises: + DATABASE_URL_RAW - Base contenant dpe_raw + DATABASE_URL_PROD - Base contenant buildings + +Dépendances pour PDF: + pip install matplotlib reportlab +""" + +import os +import sys +import argparse +from datetime import datetime, timedelta +import psycopg2 +from urllib.parse import urlparse + +# Variables globales pour stocker les données pour le PDF +report_data = {} + +def get_db_connection(database_url): + """Établit une connexion à la base de données.""" + parsed = urlparse(database_url) + return psycopg2.connect( + host=parsed.hostname, + port=parsed.port or 5432, + database=parsed.path[1:], + user=parsed.username, + password=parsed.password + ) + +def analyze_comparison(conn_raw, conn_prod): + """Compare les données DPE entre dpe_raw et buildings.""" + cursor_raw = conn_raw.cursor() + cursor_prod = conn_prod.cursor() + + print("=" * 80) + print("COMPARAISON DPE: RAW vs PRODUCTION (buildings)") + print("=" * 80) + print() + + # 1. Vue d'ensemble + print("-" * 80) + print("VUE D'ENSEMBLE") + print("-" * 80) + + cursor_raw.execute("SELECT COUNT(*) FROM dpe_raw") + total_raw = cursor_raw.fetchone()[0] + + cursor_prod.execute("SELECT COUNT(*) FROM buildings") + total_buildings = cursor_prod.fetchone()[0] + + cursor_prod.execute("SELECT COUNT(*) FROM buildings WHERE dpe_id IS NOT NULL") + buildings_with_dpe = cursor_prod.fetchone()[0] + + cursor_prod.execute("SELECT COUNT(*) FROM buildings WHERE class_dpe IS NOT NULL") + buildings_with_class = cursor_prod.fetchone()[0] + + print(f"Total DPE bruts (dpe_raw): {total_raw:,}") + print(f"Total buildings: {total_buildings:,}") + print(f"Buildings avec dpe_id: {buildings_with_dpe:,} ({buildings_with_dpe * 100.0 / total_buildings:.2f}%)") + print(f"Buildings avec class_dpe: {buildings_with_class:,} ({buildings_with_class * 100.0 / total_buildings:.2f}%)") + print() + + # Stocker pour PDF + report_data['total_raw'] = total_raw + report_data['total_buildings'] = total_buildings + report_data['buildings_with_dpe'] = buildings_with_dpe + report_data['import_rate'] = buildings_with_dpe * 100.0 / total_buildings if total_buildings > 0 else 0 + + # 2. Distribution par classe DPE dans buildings + print("-" * 80) + print("DISTRIBUTION PAR CLASSE DPE (buildings)") + print("-" * 80) + + cursor_prod.execute(""" + SELECT + COALESCE(class_dpe, 'NULL') as classe, + COUNT(*) as nb + FROM buildings + GROUP BY class_dpe + ORDER BY + CASE + WHEN class_dpe IS NULL THEN 'Z' + ELSE class_dpe + END + """) + + print(f"{'Classe':<10} {'Nombre':>15}") + print("-" * 30) + class_dist = {} + for row in cursor_prod.fetchall(): + classe, nb = row + print(f"{classe:<10} {nb:>15,}") + class_dist[classe] = nb + print() + + # Stocker pour PDF + report_data['class_distribution'] = class_dist + + # 3. Distribution par date DPE dans buildings + print("-" * 80) + print("DISTRIBUTION PAR ANNÉE DPE (buildings.dpe_date_at)") + print("-" * 80) + + cursor_prod.execute(""" + SELECT + EXTRACT(YEAR FROM dpe_date_at)::int as annee, + COUNT(*) as nb + FROM buildings + WHERE dpe_date_at IS NOT NULL + GROUP BY annee + ORDER BY annee DESC + """) + + results = cursor_prod.fetchall() + if results: + print(f"{'Année':<10} {'Nombre':>15}") + print("-" * 30) + for row in results: + annee, nb = row + print(f"{int(annee):<10} {nb:>15,}") + else: + print("Aucune date DPE renseignée dans buildings") + print() + + # 4. Comparaison des 12 derniers mois + print("-" * 80) + print("COMPARAISON DES 12 DERNIERS MOIS") + print("-" * 80) + + twelve_months_ago = datetime.now().date() - timedelta(days=365) + + # Dans dpe_raw + cursor_raw.execute(""" + SELECT + TO_CHAR(date_etablissement_dpe, 'YYYY-MM') as mois, + COUNT(*) as nb + FROM dpe_raw + WHERE date_etablissement_dpe >= %s + GROUP BY mois + ORDER BY mois DESC + """, (twelve_months_ago,)) + raw_months = {row[0]: row[1] for row in cursor_raw.fetchall()} + + # Dans buildings + cursor_prod.execute(""" + SELECT + TO_CHAR(dpe_date_at, 'YYYY-MM') as mois, + COUNT(*) as nb + FROM buildings + WHERE dpe_date_at >= %s + AND dpe_date_at IS NOT NULL + GROUP BY mois + ORDER BY mois DESC + """, (twelve_months_ago,)) + prod_months = {row[0]: row[1] for row in cursor_prod.fetchall()} + + all_months = sorted(set(raw_months.keys()) | set(prod_months.keys()), reverse=True) + + if all_months: + print(f"{'Mois':<10} {'RAW':>12} {'PROD':>12} {'Ratio':>10}") + print("-" * 50) + for mois in all_months: + raw_count = raw_months.get(mois, 0) + prod_count = prod_months.get(mois, 0) + ratio = (prod_count / raw_count * 100) if raw_count > 0 else 0 + print(f"{mois:<10} {raw_count:>12,} {prod_count:>12,} {ratio:>9.1f}%") + print() + + # 5. Analyse du type de match DPE + print("-" * 80) + print("DISTRIBUTION PAR TYPE DE MATCH DPE (dpe_import_match)") + print("-" * 80) + + cursor_prod.execute(""" + SELECT + COALESCE(dpe_import_match, 'NULL') as match_type, + COUNT(*) as nb + FROM buildings + GROUP BY dpe_import_match + ORDER BY nb DESC + """) + + print(f"{'Type de match':<20} {'Nombre':>15}") + print("-" * 40) + for row in cursor_prod.fetchall(): + match_type, nb = row + print(f"{match_type:<20} {nb:>15,}") + print() + + # 6. Analyse des DPE RAW non importés + print("-" * 80) + print("ANALYSE DES DPE RAW NON IMPORTÉS") + print("-" * 80) + print() + + # Qualité des données RAW pour le matching + cursor_raw.execute(""" + SELECT + COUNT(*) as total, + COUNT(id_rnb) as avec_rnb, + COUNT(identifiant_ban) as avec_ban, + COUNT(CASE WHEN id_rnb IS NOT NULL OR identifiant_ban IS NOT NULL THEN 1 END) as matchable + FROM dpe_raw + """) + raw_stats = cursor_raw.fetchone() + total, avec_rnb, avec_ban, matchable = raw_stats + + print("Qualité des données RAW pour le matching:") + print(f" Total DPE RAW: {total:,}") + print(f" Avec id_rnb (RNB): {avec_rnb:,} ({avec_rnb * 100.0 / total:.1f}%)") + print(f" Avec identifiant_ban: {avec_ban:,} ({avec_ban * 100.0 / total:.1f}%)") + print(f" Matchables (RNB ou BAN): {matchable:,} ({matchable * 100.0 / total:.1f}%)") + print(f" Non matchables: {total - matchable:,} ({(total - matchable) * 100.0 / total:.1f}%)") + print() + + # Vérifier les RNB IDs présents dans RAW mais absents de PROD + print("Analyse des RNB IDs:") + + # Compter les buildings avec rnb_id en PROD + cursor_prod.execute("SELECT COUNT(*) FROM buildings WHERE rnb_id IS NOT NULL") + buildings_with_rnb = cursor_prod.fetchone()[0] + print(f" Buildings avec rnb_id: {buildings_with_rnb:,}") + + # Récupérer un échantillon de RNB IDs de RAW pour vérifier leur présence en PROD + cursor_raw.execute(""" + SELECT id_rnb, COUNT(*) as nb_dpe + FROM dpe_raw + WHERE id_rnb IS NOT NULL + GROUP BY id_rnb + ORDER BY nb_dpe DESC + LIMIT 1000 + """) + sample_rnb_ids = cursor_raw.fetchall() + + if sample_rnb_ids: + rnb_ids = [row[0] for row in sample_rnb_ids] + + # Vérifier combien sont présents en PROD + cursor_prod.execute(""" + SELECT COUNT(DISTINCT rnb_id) + FROM buildings + WHERE rnb_id = ANY(%s) + """, (rnb_ids,)) + found_in_prod = cursor_prod.fetchone()[0] + + print(f" Échantillon 1000 RNB IDs les plus fréquents dans RAW:") + print(f" - Trouvés dans buildings: {found_in_prod:,}") + print(f" - Non trouvés: {len(rnb_ids) - found_in_prod:,}") + print(f" - Taux de matching: {found_in_prod * 100.0 / len(rnb_ids):.1f}%") + print() + + # Distribution des DPE RAW par présence des identifiants + print("Distribution des DPE RAW par type d'identifiant:") + cursor_raw.execute(""" + SELECT + CASE + WHEN id_rnb IS NOT NULL AND identifiant_ban IS NOT NULL THEN 'RNB + BAN' + WHEN id_rnb IS NOT NULL THEN 'RNB seul' + WHEN identifiant_ban IS NOT NULL THEN 'BAN seul' + ELSE 'Aucun' + END as type_id, + COUNT(*) as nb + FROM dpe_raw + GROUP BY type_id + ORDER BY nb DESC + """) + + print(f"{'Type identifiant':<20} {'Nombre':>15} {'%':>10}") + print("-" * 50) + id_dist = {} + for row in cursor_raw.fetchall(): + type_id, nb = row + print(f"{type_id:<20} {nb:>15,} {nb * 100.0 / total:>9.1f}%") + id_dist[type_id] = nb + print() + + # Stocker pour PDF + report_data['id_distribution'] = id_dist + + # Comparer les DPE importés vs non importés par année + print("-" * 80) + print("DPE IMPORTÉS VS NON IMPORTÉS PAR ANNÉE") + print("-" * 80) + + # DPE dans RAW par année + cursor_raw.execute(""" + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + COUNT(*) as total_raw, + COUNT(id_rnb) as avec_rnb, + COUNT(identifiant_ban) as avec_ban + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + GROUP BY annee + ORDER BY annee DESC + """) + raw_by_year = {row[0]: {'total': row[1], 'rnb': row[2], 'ban': row[3]} for row in cursor_raw.fetchall()} + + # DPE importés par année + cursor_prod.execute(""" + SELECT + EXTRACT(YEAR FROM dpe_date_at)::int as annee, + COUNT(*) as nb + FROM buildings + WHERE dpe_date_at IS NOT NULL + GROUP BY annee + ORDER BY annee DESC + """) + prod_by_year = {row[0]: row[1] for row in cursor_prod.fetchall()} + + all_years = sorted(set(raw_by_year.keys()) | set(prod_by_year.keys()), reverse=True) + + print(f"{'Année':<8} {'RAW Total':>12} {'RAW+RNB':>12} {'RAW+BAN':>12} {'PROD':>12} {'Import %':>10}") + print("-" * 75) + year_dist = {} + for year in all_years: + raw_data = raw_by_year.get(year, {'total': 0, 'rnb': 0, 'ban': 0}) + prod_count = prod_by_year.get(year, 0) + import_rate = (prod_count / raw_data['total'] * 100) if raw_data['total'] > 0 else 0 + print(f"{year:<8} {raw_data['total']:>12,} {raw_data['rnb']:>12,} {raw_data['ban']:>12,} {prod_count:>12,} {import_rate:>9.1f}%") + year_dist[year] = {'total': raw_data['total'], 'prod': prod_count, 'import_rate': import_rate} + print() + + # Stocker pour PDF + report_data['year_distribution'] = year_dist + print("Légende:") + print(" RAW Total : Nombre total de DPE dans dpe_raw pour cette année") + print(" RAW+RNB : DPE avec id_rnb renseigné (matchables via buildings.rnb_id)") + print(" RAW+BAN : DPE avec identifiant_ban renseigné (matchables via ban_addresses.ban_id)") + print(" PROD : DPE effectivement importés dans buildings") + print(" Import % : PROD / RAW Total * 100") + print() + + # Analyse par trimestre + print("-" * 80) + print("DPE IMPORTÉS VS NON IMPORTÉS PAR TRIMESTRE") + print("-" * 80) + + # DPE dans RAW par trimestre + cursor_raw.execute(""" + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + EXTRACT(QUARTER FROM date_etablissement_dpe)::int as trimestre, + COUNT(*) as total_raw, + COUNT(id_rnb) as avec_rnb, + COUNT(identifiant_ban) as avec_ban + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + GROUP BY annee, trimestre + ORDER BY annee DESC, trimestre DESC + """) + raw_by_quarter = {} + for row in cursor_raw.fetchall(): + annee, trimestre, total_raw, avec_rnb, avec_ban = row + key = f"{int(annee)}-Q{int(trimestre)}" + raw_by_quarter[key] = {'total': total_raw, 'rnb': avec_rnb, 'ban': avec_ban} + + # DPE importés par trimestre + cursor_prod.execute(""" + SELECT + EXTRACT(YEAR FROM dpe_date_at)::int as annee, + EXTRACT(QUARTER FROM dpe_date_at)::int as trimestre, + COUNT(*) as nb + FROM buildings + WHERE dpe_date_at IS NOT NULL + GROUP BY annee, trimestre + ORDER BY annee DESC, trimestre DESC + """) + prod_by_quarter = {} + for row in cursor_prod.fetchall(): + annee, trimestre, nb = row + key = f"{int(annee)}-Q{int(trimestre)}" + prod_by_quarter[key] = nb + + all_quarters = sorted(set(raw_by_quarter.keys()) | set(prod_by_quarter.keys()), reverse=True) + + print(f"{'Trimestre':<12} {'RAW Total':>12} {'RAW+RNB':>12} {'RAW+BAN':>12} {'PROD':>12} {'Import %':>10}") + print("-" * 80) + quarter_dist = {} + for quarter in all_quarters: + raw_data = raw_by_quarter.get(quarter, {'total': 0, 'rnb': 0, 'ban': 0}) + prod_count = prod_by_quarter.get(quarter, 0) + import_rate = (prod_count / raw_data['total'] * 100) if raw_data['total'] > 0 else 0 + print(f"{quarter:<12} {raw_data['total']:>12,} {raw_data['rnb']:>12,} {raw_data['ban']:>12,} {prod_count:>12,} {import_rate:>9.1f}%") + quarter_dist[quarter] = { + 'total': raw_data['total'], + 'rnb': raw_data['rnb'], + 'ban': raw_data['ban'], + 'prod': prod_count, + 'import_rate': import_rate + } + print() + + # Stocker pour PDF + report_data['quarter_distribution'] = quarter_dist + + # Analyse du faible taux d'import pour 2025 + print("-" * 80) + print("ANALYSE DU FAIBLE TAUX D'IMPORT 2025 (RAW+BAN vs PROD)") + print("-" * 80) + print() + + # Compter les identifiant_ban uniques pour 2025 dans RAW + cursor_raw.execute(""" + SELECT COUNT(DISTINCT identifiant_ban) as unique_ban_ids + FROM dpe_raw + WHERE identifiant_ban IS NOT NULL + AND EXTRACT(YEAR FROM date_etablissement_dpe) = 2025 + """) + unique_ban_2025_raw = cursor_raw.fetchone()[0] + print(f"Identifiants BAN uniques dans dpe_raw (2025): {unique_ban_2025_raw:,}") + + # Vérifier combien de ban_addresses existent en PROD + cursor_prod.execute(""" + SELECT COUNT(DISTINCT ban_id) as unique_ban_ids + FROM ban_addresses + WHERE address_kind = 'Housing' + """) + unique_ban_in_prod = cursor_prod.fetchone()[0] + print(f"Adresses BAN uniques dans ban_addresses (PROD): {unique_ban_in_prod:,}") + print() + + # Stocker pour PDF + report_data['unique_ban_prod'] = unique_ban_in_prod + report_data['unique_ban_raw'] = unique_ban_2025_raw + + # Compter combien de BAN 2025 existent dans ban_addresses (comparaison directe) + cursor_raw.execute(""" + SELECT ARRAY_AGG(DISTINCT identifiant_ban) + FROM dpe_raw + WHERE identifiant_ban IS NOT NULL + AND EXTRACT(YEAR FROM date_etablissement_dpe) = 2025 + """) + all_ban_2025 = cursor_raw.fetchone()[0] or [] + + if all_ban_2025: + cursor_prod.execute(""" + SELECT COUNT(DISTINCT ban_id) + FROM ban_addresses + WHERE address_kind = 'Housing' + AND ban_id = ANY(%s) + """, (all_ban_2025,)) + found_ban_in_prod = cursor_prod.fetchone()[0] + + match_rate = found_ban_in_prod * 100.0 / len(all_ban_2025) + print(f"Matching des identifiant_ban 2025 avec ban_addresses:") + print(f" - Total BAN uniques 2025: {len(all_ban_2025):,}") + print(f" - Trouvés dans ban_addresses: {found_ban_in_prod:,}") + print(f" - Non trouvés: {len(all_ban_2025) - found_ban_in_prod:,}") + print(f" - Taux de matching: {match_rate:.1f}%") + print() + + # Stocker pour PDF + report_data['ban_match_rate'] = match_rate + + # Vérifier combien de buildings matchés ont déjà un DPE + cursor_prod.execute(""" + SELECT + COUNT(DISTINCT b.id) as total_buildings, + COUNT(DISTINCT CASE WHEN b.dpe_id IS NOT NULL THEN b.id END) as avec_dpe + FROM buildings b + JOIN fast_housing fh ON b.id = fh.building_id + JOIN ban_addresses ba ON fh.id = ba.ref_id + WHERE ba.address_kind = 'Housing' + AND ba.ban_id = ANY(%s) + """, (all_ban_2025,)) + result = cursor_prod.fetchone() + if result and result[0] > 0: + total_b, avec_dpe = result + sans_dpe = total_b - avec_dpe + print(f"Buildings correspondant aux BAN 2025:") + print(f" - Total trouvés: {total_b:,}") + print(f" - Déjà avec DPE: {avec_dpe:,} ({avec_dpe * 100.0 / total_b:.1f}%)") + print(f" - Sans DPE (importables): {sans_dpe:,} ({sans_dpe * 100.0 / total_b:.1f}%)") + print() + + # ANALYSE DÉTAILLÉE : Vérifier si les DPE ont été mis à jour + print("Analyse détaillée des buildings avec DPE:") + cursor_prod.execute(""" + SELECT + EXTRACT(YEAR FROM b.dpe_date_at)::int as annee_dpe, + COUNT(DISTINCT b.id) as nb_buildings + FROM buildings b + JOIN fast_housing fh ON b.id = fh.building_id + JOIN ban_addresses ba ON fh.id = ba.ref_id + WHERE ba.address_kind = 'Housing' + AND ba.ban_id = ANY(%s) + AND b.dpe_id IS NOT NULL + AND b.dpe_date_at IS NOT NULL + GROUP BY annee_dpe + ORDER BY annee_dpe DESC + """, (all_ban_2025,)) + + print(f" Répartition par année du DPE actuellement en base:") + for row in cursor_prod.fetchall(): + annee, nb = row + print(f" - {int(annee)}: {nb:,} buildings") + + # Compter combien ont effectivement un DPE de 2025 + cursor_prod.execute(""" + SELECT COUNT(DISTINCT b.id) + FROM buildings b + JOIN fast_housing fh ON b.id = fh.building_id + JOIN ban_addresses ba ON fh.id = ba.ref_id + WHERE ba.address_kind = 'Housing' + AND ba.ban_id = ANY(%s) + AND b.dpe_id IS NOT NULL + AND EXTRACT(YEAR FROM b.dpe_date_at) = 2025 + """, (all_ban_2025,)) + buildings_avec_dpe_2025 = cursor_prod.fetchone()[0] + + print() + print(f" ⚠️ Buildings avec un DPE 2025: {buildings_avec_dpe_2025:,}") + print(f" ⚠️ Buildings avec un DPE plus ancien: {avec_dpe - buildings_avec_dpe_2025:,}") + print() + print("CONCLUSION:") + if buildings_avec_dpe_2025 < avec_dpe: + print(f" Les {avec_dpe - buildings_avec_dpe_2025:,} buildings avec DPE ancien N'ONT PAS été mis à jour") + print(f" avec les DPE 2025 disponibles dans dpe_raw.") + print(f" Cela suggère que le script d'import ne met PAS à jour les DPE existants.") + print() + + print("Causes possibles du faible taux d'import 2025:") + print(" 1. Les identifiant_ban du fichier DPE ne correspondent pas aux ban_id en base") + print(" 2. Les buildings avec ces adresses ont déjà un DPE importé (années précédentes)") + print(" NOTE: Le script import-dpe.py a été modifié pour mettre à jour avec le DPE le plus récent") + print(" Les buildings avec DPE existant seront mis à jour si le nouveau DPE est plus récent") + print(" 3. Le script import-dpe.py n'a pas été relancé avec les données 2025") + print(" 4. Peu de id_rnb disponibles pour 2025 (16k vs 1.5M pour 2024)") + print() + + # Comparaison des différentes dates par année + print("-" * 80) + print("DISTRIBUTION PAR TYPE DE DATE (par année)") + print("-" * 80) + + cursor_raw.execute(""" + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + COUNT(*) as etablissement, + COUNT(CASE WHEN EXTRACT(YEAR FROM date_reception_dpe) = EXTRACT(YEAR FROM date_etablissement_dpe) THEN 1 END) as reception_meme_annee, + COUNT(CASE WHEN EXTRACT(YEAR FROM date_visite_diagnostiqueur) = EXTRACT(YEAR FROM date_etablissement_dpe) THEN 1 END) as visite_meme_annee, + MAX(date_reception_dpe) as max_reception, + MAX(date_derniere_modification_dpe) as max_modification + FROM dpe_raw + WHERE date_etablissement_dpe IS NOT NULL + GROUP BY annee + ORDER BY annee DESC + """) + + print(f"{'Année':<8} {'Établissement':>14} {'Max Réception':>15} {'Max Modification':>18}") + print("-" * 60) + for row in cursor_raw.fetchall(): + annee, etablissement, _, _, max_reception, max_modification = row + max_rec_str = str(max_reception)[:10] if max_reception else 'N/A' + max_mod_str = str(max_modification)[:10] if max_modification else 'N/A' + print(f"{int(annee):<8} {etablissement:>14,} {max_rec_str:>15} {max_mod_str:>18}") + print() + + # Distribution mensuelle récente avec toutes les dates + print("-" * 80) + print("DISTRIBUTION MENSUELLE RÉCENTE (12 derniers mois)") + print("-" * 80) + + cursor_raw.execute(""" + SELECT + TO_CHAR(date_etablissement_dpe, 'YYYY-MM') as mois, + COUNT(*) as etablissement, + MAX(date_reception_dpe) as max_reception, + MAX(date_derniere_modification_dpe) as max_modification + FROM dpe_raw + WHERE date_etablissement_dpe >= CURRENT_DATE - INTERVAL '12 months' + GROUP BY mois + ORDER BY mois DESC + """) + + print(f"{'Mois':<10} {'Établissement':>14} {'Max Réception':>15} {'Max Modification':>18}") + print("-" * 62) + for row in cursor_raw.fetchall(): + mois, etablissement, max_reception, max_modification = row + max_rec_str = str(max_reception)[:10] if max_reception else 'N/A' + max_mod_str = str(max_modification)[:10] if max_modification else 'N/A' + print(f"{mois:<10} {etablissement:>14,} {max_rec_str:>15} {max_mod_str:>18}") + print() + + # 7. Détection d'anomalies + print("-" * 80) + print("DÉTECTION D'ANOMALIES") + print("-" * 80) + print() + + # Buildings avec dpe_id mais sans class_dpe + cursor_prod.execute(""" + SELECT COUNT(*) + FROM buildings + WHERE dpe_id IS NOT NULL AND class_dpe IS NULL + """) + anomaly1 = cursor_prod.fetchone()[0] + print(f"Buildings avec dpe_id mais sans class_dpe: {anomaly1:,}") + + # Buildings avec class_dpe mais sans dpe_date_at + cursor_prod.execute(""" + SELECT COUNT(*) + FROM buildings + WHERE class_dpe IS NOT NULL AND dpe_date_at IS NULL + """) + anomaly2 = cursor_prod.fetchone()[0] + print(f"Buildings avec class_dpe mais sans dpe_date_at: {anomaly2:,}") + + # DPE avec dates futures + cursor_prod.execute(""" + SELECT COUNT(*) + FROM buildings + WHERE dpe_date_at > CURRENT_DATE + """) + anomaly3 = cursor_prod.fetchone()[0] + print(f"Buildings avec dpe_date_at dans le futur: {anomaly3:,}") + + # DPE très anciens (avant 2021 - début du nouveau DPE) + cursor_prod.execute(""" + SELECT COUNT(*) + FROM buildings + WHERE dpe_date_at < '2021-01-01' + """) + anomaly4 = cursor_prod.fetchone()[0] + print(f"Buildings avec dpe_date_at avant 2021: {anomaly4:,}") + print() + + # Stocker pour PDF + report_data['total_anomalies'] = anomaly1 + anomaly2 + anomaly3 + anomaly4 + + # 7. Distribution des adresses BAN non trouvées + print("-" * 80) + print("DISTRIBUTION DES ADRESSES BAN NON TROUVÉES") + print("-" * 80) + print() + + # Identifiants BAN de RAW qui ne sont pas dans PROD + cursor_raw.execute(""" + SELECT identifiant_ban + FROM dpe_raw + WHERE identifiant_ban IS NOT NULL + """) + raw_ban_ids = set(row[0] for row in cursor_raw.fetchall()) + + cursor_prod.execute(""" + SELECT ban_id + FROM ban_addresses + """) + prod_ban_ids = set(row[0] for row in cursor_prod.fetchall()) + + missing_ban_ids = raw_ban_ids - prod_ban_ids + found_ban_ids = raw_ban_ids & prod_ban_ids + + print(f"Total identifiant_ban dans dpe_raw: {len(raw_ban_ids):,}") + print(f"Total ban_id dans ban_addresses: {len(prod_ban_ids):,}") + print(f"BAN trouvés: {len(found_ban_ids):,} ({len(found_ban_ids) * 100.0 / len(raw_ban_ids):.1f}%)") + print(f"BAN manquants: {len(missing_ban_ids):,} ({len(missing_ban_ids) * 100.0 / len(raw_ban_ids):.1f}%)") + print() + + # Distribution par trimestre des BAN manquants + ban_missing_by_quarter = {} + if missing_ban_ids: + print("Distribution des BAN manquants par trimestre:") + cursor_raw.execute(""" + SELECT + EXTRACT(YEAR FROM date_etablissement_dpe)::int as annee, + EXTRACT(QUARTER FROM date_etablissement_dpe)::int as trimestre, + COUNT(DISTINCT identifiant_ban) as nb_missing + FROM dpe_raw + WHERE identifiant_ban = ANY(%s) + AND date_etablissement_dpe IS NOT NULL + GROUP BY annee, trimestre + ORDER BY annee DESC, trimestre DESC + """, (list(missing_ban_ids),)) + + print(f"{'Trimestre':<12} {'BAN Manquants':>15}") + print("-" * 35) + for row in cursor_raw.fetchall(): + annee, trimestre, nb = row + quarter_key = f"{int(annee)}-Q{int(trimestre)}" + ban_missing_by_quarter[quarter_key] = nb + print(f"{quarter_key:<12} {nb:>15,}") + print() + + # Distribution par département (via code INSEE - 2 premiers caractères) + print("Distribution des BAN manquants par département (top 10):") + cursor_raw.execute(""" + SELECT + SUBSTRING(identifiant_ban FROM 1 FOR 2) as dept, + COUNT(DISTINCT identifiant_ban) as nb_missing + FROM dpe_raw + WHERE identifiant_ban = ANY(%s) + GROUP BY dept + ORDER BY nb_missing DESC + LIMIT 10 + """, (list(missing_ban_ids),)) + + print(f"{'Département':<15} {'BAN Manquants':>15}") + print("-" * 35) + for row in cursor_raw.fetchall(): + dept, nb = row + print(f"{dept:<15} {nb:>15,}") + print() + + # Exemples d'identifiants manquants (échantillon de 10) + print("Échantillon d'identifiants BAN manquants (10 premiers):") + sample_missing = list(missing_ban_ids)[:10] + for ban_id in sample_missing: + print(f" - {ban_id}") + print() + + # Stocker pour PDF + report_data['ban_analysis'] = { + 'total_raw': len(raw_ban_ids), + 'total_prod': len(prod_ban_ids), + 'found': len(found_ban_ids), + 'missing': len(missing_ban_ids), + 'found_rate': len(found_ban_ids) * 100.0 / len(raw_ban_ids) if len(raw_ban_ids) > 0 else 0, + 'missing_by_quarter': ban_missing_by_quarter + } + + # 8. Résumé des anomalies + print("=" * 80) + print("RÉSUMÉ") + print("=" * 80) + print() + + taux_import = buildings_with_dpe * 100.0 / total_buildings if total_buildings > 0 else 0 + print(f"Taux d'import DPE: {taux_import:.2f}%") + print(f"Anomalies détectées: {anomaly1 + anomaly2 + anomaly3 + anomaly4:,}") + print() + + if taux_import < 50: + print("⚠️ ALERTE: Taux d'import DPE inférieur à 50%") + print(" Vérifier que l'import DPE a bien été exécuté sur les buildings.") + + cursor_raw.close() + cursor_prod.close() + + return report_data + + +def generate_pdf_report(data, output_file): + """Génère un rapport PDF avec graphiques.""" + try: + import matplotlib.pyplot as plt + from matplotlib.backends.backend_pdf import PdfPages + except ImportError: + print("Erreur: matplotlib requis pour la génération PDF", file=sys.stderr) + print("Installation: pip install matplotlib", file=sys.stderr) + sys.exit(1) + + with PdfPages(output_file) as pdf: + # Page 1: Vue d'ensemble + fig, axes = plt.subplots(2, 2, figsize=(12, 10)) + fig.suptitle('Rapport DPE - Vue d\'ensemble', fontsize=16, fontweight='bold') + + # Graphique 1: Distribution par classe DPE (Pie chart) + if 'class_distribution' in data: + ax = axes[0, 0] + classes = [c for c in data['class_distribution'].keys() if c != 'NULL'] + values = [data['class_distribution'][c] for c in classes] + colors = ['#2E7D32', '#4CAF50', '#8BC34A', '#CDDC39', '#FFC107', '#FF9800', '#F44336'] + ax.pie(values, labels=classes, autopct='%1.1f%%', colors=colors[:len(classes)]) + ax.set_title('Distribution par classe DPE') + + # Graphique 2: DPE par année (Histogramme) + if 'year_distribution' in data: + ax = axes[0, 1] + years = sorted(data['year_distribution'].keys()) + raw_values = [data['year_distribution'][y]['total'] for y in years] + prod_values = [data['year_distribution'][y]['prod'] for y in years] + x = range(len(years)) + width = 0.35 + ax.bar([i - width/2 for i in x], raw_values, width, label='RAW', color='#2196F3') + ax.bar([i + width/2 for i in x], prod_values, width, label='PROD', color='#4CAF50') + ax.set_xlabel('Année') + ax.set_ylabel('Nombre de DPE') + ax.set_title('DPE RAW vs PROD par année') + ax.set_xticks(x) + ax.set_xticklabels([str(y) for y in years]) + ax.legend() + + # Graphique 3: Taux d'import par année + if 'year_distribution' in data: + ax = axes[1, 0] + years = sorted(data['year_distribution'].keys()) + rates = [data['year_distribution'][y]['import_rate'] for y in years] + ax.bar(years, rates, color='#FF9800') + ax.set_xlabel('Année') + ax.set_ylabel('Taux d\'import (%)') + ax.set_title('Taux d\'import par année') + ax.axhline(y=50, color='r', linestyle='--', label='Seuil 50%') + ax.legend() + + # Graphique 4: Répartition des identifiants + if 'id_distribution' in data: + ax = axes[1, 1] + labels = list(data['id_distribution'].keys()) + values = list(data['id_distribution'].values()) + ax.pie(values, labels=labels, autopct='%1.1f%%') + ax.set_title('Répartition par type d\'identifiant') + + plt.tight_layout() + pdf.savefig(fig) + plt.close() + + # Page 2: Analyse détaillée + fig, ax = plt.subplots(figsize=(12, 8)) + ax.axis('off') + + # Texte d'analyse + text = f""" +ANALYSE DPE - RAPPORT DÉTAILLÉ +{'=' * 50} + +VUE D'ENSEMBLE +{'-' * 30} +Total DPE bruts (RAW): {data.get('total_raw', 'N/A'):,} +Total buildings (PROD): {data.get('total_buildings', 'N/A'):,} +Buildings avec DPE: {data.get('buildings_with_dpe', 'N/A'):,} +Taux d'import global: {data.get('import_rate', 0):.2f}% + +ANALYSE DU MATCHING +{'-' * 30} +BAN uniques dans RAW: {data.get('unique_ban_raw', 'N/A'):,} +BAN uniques dans PROD: {data.get('unique_ban_prod', 'N/A'):,} +Taux de matching BAN: {data.get('ban_match_rate', 0):.1f}% + +ANOMALIES DÉTECTÉES +{'-' * 30} +Total anomalies: {data.get('total_anomalies', 0):,} + +CONCLUSIONS +{'-' * 30} +""" + if data.get('import_rate', 0) < 50: + text += "⚠️ Taux d'import inférieur à 50%\n" + text += " - Vérifier le matching entre identifiant_ban et ban_addresses\n" + text += " - Vérifier que l'import DPE a été exécuté récemment\n" + + if data.get('ban_match_rate', 0) < 50: + text += "⚠️ Faible taux de matching BAN\n" + text += " - Les identifiant_ban du fichier DPE ne correspondent pas aux adresses en base\n" + + ax.text(0.05, 0.95, text, transform=ax.transAxes, fontsize=10, + verticalalignment='top', fontfamily='monospace') + + pdf.savefig(fig) + plt.close() + + # Page 3: Distribution par trimestre + if 'quarter_distribution' in data and data['quarter_distribution']: + fig, axes = plt.subplots(2, 1, figsize=(14, 10)) + fig.suptitle('Distribution DPE par Trimestre', fontsize=16, fontweight='bold') + + # Graphique 1: RAW vs PROD par trimestre avec RNB IDs + ax = axes[0] + quarters = sorted(data['quarter_distribution'].keys(), reverse=True)[:20] # Limiter aux 20 derniers trimestres + quarters.reverse() # Pour afficher de gauche à droite + raw_values = [data['quarter_distribution'][q]['total'] for q in quarters] + rnb_values = [data['quarter_distribution'][q]['rnb'] for q in quarters] + prod_values = [data['quarter_distribution'][q]['prod'] for q in quarters] + + x = range(len(quarters)) + width = 0.25 + ax.bar([i - width for i in x], raw_values, width, label='RAW Total', color='#2196F3', alpha=0.8) + ax.bar(x, rnb_values, width, label='RAW+RNB', color='#FFC107', alpha=0.8) + ax.bar([i + width for i in x], prod_values, width, label='PROD (Importés)', color='#4CAF50', alpha=0.8) + ax.set_xlabel('Trimestre', fontsize=12) + ax.set_ylabel('Nombre de DPE', fontsize=12) + ax.set_title('DPE RAW vs PROD par trimestre (20 derniers trimestres)', fontsize=14) + ax.set_xticks(x) + ax.set_xticklabels(quarters, rotation=45, ha='right') + ax.legend(fontsize=11) + ax.grid(axis='y', alpha=0.3) + + # Graphique 2: Taux d'import par trimestre + ax = axes[1] + import_rates = [data['quarter_distribution'][q]['import_rate'] for q in quarters] + ax.bar(x, import_rates, color='#FF9800', alpha=0.8) + ax.set_xlabel('Trimestre', fontsize=12) + ax.set_ylabel('Taux d\'import (%)', fontsize=12) + ax.set_title('Taux d\'import par trimestre', fontsize=14) + ax.set_xticks(x) + ax.set_xticklabels(quarters, rotation=45, ha='right') + ax.axhline(y=15, color='r', linestyle='--', linewidth=1.5, label='Seuil 15%', alpha=0.7) + ax.legend(fontsize=11) + ax.grid(axis='y', alpha=0.3) + + plt.tight_layout() + pdf.savefig(fig) + plt.close() + + # Page 4: Analyse des adresses BAN non trouvées + if 'ban_analysis' in data and 'quarter_distribution' in data: + fig, axes = plt.subplots(2, 1, figsize=(14, 10)) + fig.suptitle('Analyse des Adresses BAN Non Trouvées', fontsize=16, fontweight='bold') + + # Graphique 1: Distribution par trimestre avec RAW, PROD et BAN manquants + ax = axes[0] + ban_data = data['ban_analysis'] + if ban_data.get('missing_by_quarter') and data.get('quarter_distribution'): + # Limiter aux 20 derniers trimestres + quarters = sorted(data['quarter_distribution'].keys(), reverse=True)[:20] + quarters.reverse() + + raw_values = [data['quarter_distribution'][q]['total'] for q in quarters] + prod_values = [data['quarter_distribution'][q]['prod'] for q in quarters] + missing_values = [ban_data['missing_by_quarter'].get(q, 0) for q in quarters] + + x = range(len(quarters)) + width = 0.25 + ax.bar([i - width for i in x], raw_values, width, label='RAW Total', color='#2196F3', alpha=0.8) + ax.bar(x, prod_values, width, label='PROD (Importés)', color='#4CAF50', alpha=0.8) + ax.bar([i + width for i in x], missing_values, width, label='BAN Non Trouvés', color='#F44336', alpha=0.8) + ax.set_xlabel('Trimestre', fontsize=12) + ax.set_ylabel('Nombre de DPE', fontsize=12) + ax.set_title('DPE RAW vs PROD vs BAN manquants par trimestre (20 derniers)', fontsize=14) + ax.set_xticks(x) + ax.set_xticklabels(quarters, rotation=45, ha='right') + ax.legend(fontsize=11) + ax.grid(axis='y', alpha=0.3) + + # Graphique 2: Pie chart et statistiques + ax = axes[1] + # Diviser axes[1] en deux parties + ax.remove() + # Utiliser add_subplot pour créer deux sous-graphiques côte à côte + ax_pie = fig.add_subplot(2, 2, 3) + ax_text = fig.add_subplot(2, 2, 4) + + # Pie chart + sizes = [ban_data['found'], ban_data['missing']] + labels = [f"Trouvées\n{ban_data['found']:,}", f"Non trouvées\n{ban_data['missing']:,}"] + colors = ['#4CAF50', '#F44336'] + explode = (0, 0.1) + ax_pie.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors, explode=explode, startangle=90) + ax_pie.set_title(f'Taux de matching BAN\n(Total: {ban_data["total_raw"]:,})', fontsize=11) + + # Texte récapitulatif + ax_text.axis('off') + summary_text = f""" +STATISTIQUES + +Total identifiant_ban dans DPE: + {ban_data['total_raw']:,} + +Adresses trouvées dans ZLV: + {ban_data['found']:,} ({ban_data['found_rate']:.1f}%) + +Adresses NON trouvées: + {ban_data['missing']:,} ({100 - ban_data['found_rate']:.1f}%) + +IMPACT +Les {ban_data['missing']:,} adresses manquantes +empêchent l'import des DPE correspondants. + +ACTION RECOMMANDÉE +Mettre à jour ban_addresses avec les +dernières données BAN. + """ + ax_text.text(0.05, 0.5, summary_text.strip(), transform=ax_text.transAxes, + fontsize=10, verticalalignment='center', fontfamily='monospace', + bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.3)) + + plt.tight_layout() + pdf.savefig(fig) + plt.close() + + # Page 5: Distribution mensuelle + if 'monthly_distribution' in data and data['monthly_distribution']: + fig, ax = plt.subplots(figsize=(12, 6)) + months = list(data['monthly_distribution'].keys()) + values = list(data['monthly_distribution'].values()) + ax.bar(months, values, color='#2196F3') + ax.set_xlabel('Mois') + ax.set_ylabel('Nombre de DPE') + ax.set_title('Distribution mensuelle (12 derniers mois)') + plt.xticks(rotation=45) + plt.tight_layout() + pdf.savefig(fig) + plt.close() + + print(f"Rapport PDF généré: {output_file}") + + +def main(): + # Parser les arguments + parser = argparse.ArgumentParser(description='Analyse comparative DPE RAW vs PROD') + parser.add_argument('--pdf', help='Générer un rapport PDF avec graphiques') + args = parser.parse_args() + + # Vérifier les variables d'environnement + url_raw = os.environ.get('DATABASE_URL_RAW') + url_prod = os.environ.get('DATABASE_URL_PROD') + + if not url_raw: + print("Erreur: DATABASE_URL_RAW non définie", file=sys.stderr) + sys.exit(1) + + if not url_prod: + print("Erreur: DATABASE_URL_PROD non définie", file=sys.stderr) + sys.exit(1) + + try: + print("Connexion à la base RAW (dpe_raw)...") + conn_raw = get_db_connection(url_raw) + print("Connexion à la base PROD (buildings)...") + conn_prod = get_db_connection(url_prod) + print() + + data = analyze_comparison(conn_raw, conn_prod) + + # Générer le PDF si demandé + if args.pdf: + generate_pdf_report(data, args.pdf) + + conn_raw.close() + conn_prod.close() + except Exception as e: + print(f"Erreur: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/server/src/scripts/import-dpe/ban_lookup.py b/server/src/scripts/import-dpe/ban_lookup.py new file mode 100644 index 000000000..11a95d3e1 --- /dev/null +++ b/server/src/scripts/import-dpe/ban_lookup.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +""" +Script de recherche d'adresses BAN par identifiant. + +Utilise le format Parquet pour des performances optimales. +Convertit automatiquement le CSV en Parquet lors de la première utilisation. + +Usage: + # Convertir CSV en Parquet (une seule fois) + python ban_lookup.py --convert adresses-france.csv + + # Rechercher par identifiant BAN + python ban_lookup.py --search 59414_1100_00738 + + # Rechercher plusieurs identifiants + python ban_lookup.py --search 59414_1100_00738 75119_2870_00034 13004_4016_00009 + + # Rechercher depuis un fichier (un id par ligne) + python ban_lookup.py --search-file missing_ban_ids.txt --output results.csv + + # Spécifier un fichier Parquet différent + python ban_lookup.py --parquet /path/to/ban.parquet --search 59414_1100_00738 + +Dépendances: + pip install pandas pyarrow tqdm +""" + +import os +import sys +import argparse +from pathlib import Path +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing + +def convert_csv_to_parquet(csv_file, parquet_file=None): + """Convertit un fichier CSV BAN en format Parquet.""" + try: + import pandas as pd + except ImportError: + print("Erreur: pandas requis. Installation: pip install pandas pyarrow", file=sys.stderr) + sys.exit(1) + + # Créer le fichier Parquet dans le même dossier que le CSV source + csv_path = Path(csv_file) + if parquet_file is None: + parquet_file = csv_path.with_suffix('.parquet') + else: + # Si un nom de fichier Parquet est fourni sans chemin, le mettre dans le dossier du CSV + parquet_path = Path(parquet_file) + if not parquet_path.is_absolute() and parquet_path.parent == Path('.'): + parquet_file = csv_path.parent / parquet_path.name + + print(f"Conversion de {csv_file} en Parquet...") + print(" Cette opération peut prendre plusieurs minutes pour un fichier volumineux.") + + # Lire le CSV avec les bons types - le fichier BAN utilise le séparateur ';' + df = pd.read_csv( + csv_file, + sep=';', # Le fichier BAN utilise ';' comme séparateur + dtype={ + 'id': str, + 'id_fantoir': str, + 'numero': str, + 'rep': str, + 'nom_voie': str, + 'code_postal': str, + 'code_insee': str, + 'nom_commune': str, + 'code_insee_ancienne_commune': str, + 'nom_ancienne_commune': str, + 'x': float, + 'y': float, + 'lon': float, + 'lat': float, + 'libelle_acheminement': str, + 'nom_afnor': str, + 'source_position': str, + 'source_nom_voie': str + }, + low_memory=False + ) + + print(f" - {len(df):,} adresses chargées") + + # Sauvegarder en Parquet avec compression + df.to_parquet(parquet_file, index=False, compression='snappy') + + file_size_mb = os.path.getsize(parquet_file) / (1024 * 1024) + print(f" - Fichier Parquet créé: {parquet_file} ({file_size_mb:.1f} MB)") + + return parquet_file + + +def load_parquet(parquet_file): + """Charge le fichier Parquet en mémoire.""" + try: + import pandas as pd + except ImportError: + print("Erreur: pandas requis. Installation: pip install pandas pyarrow", file=sys.stderr) + sys.exit(1) + + if not os.path.exists(parquet_file): + print(f"Erreur: fichier {parquet_file} non trouvé", file=sys.stderr) + print("Utilisez --convert pour créer le fichier Parquet depuis un CSV", file=sys.stderr) + sys.exit(1) + + print(f"Chargement de {parquet_file}...") + df = pd.read_parquet(parquet_file) + print(f" - {len(df):,} adresses chargées") + + # Créer un index sur la colonne 'id' pour des recherches rapides + df.set_index('id', inplace=True, drop=False) + + return df + + +def search_single_id(ban_id, index_set, df_dict): + """Recherche une seule adresse par son identifiant BAN.""" + try: + if ban_id in index_set: + row = df_dict[ban_id] + return { + 'id': ban_id, + 'found': True, + 'numero': row.get('numero', ''), + 'rep': row.get('rep', ''), + 'nom_voie': row.get('nom_voie', ''), + 'code_postal': row.get('code_postal', ''), + 'nom_commune': row.get('nom_commune', ''), + 'code_insee': row.get('code_insee', ''), + 'lon': row.get('lon', ''), + 'lat': row.get('lat', '') + } + else: + return { + 'id': ban_id, + 'found': False, + 'numero': '', + 'rep': '', + 'nom_voie': '', + 'code_postal': '', + 'nom_commune': '', + 'code_insee': '', + 'lon': '', + 'lat': '' + } + except Exception as e: + return { + 'id': ban_id, + 'found': False, + 'error': str(e) + } + + +def search_by_ids(df, ban_ids, use_tqdm=True): + """Recherche des adresses par leurs identifiants BAN avec barre de progression.""" + try: + from tqdm import tqdm + except ImportError: + use_tqdm = False + + # Créer un dict avec la première occurrence de chaque id pour des recherches O(1) + print(" Création de l'index de recherche...") + # Supprimer les doublons en gardant la première occurrence + df_unique = df[~df.index.duplicated(keep='first')] + df_dict = df_unique.to_dict('index') + index_set = set(df_dict.keys()) + + results = [] + + if use_tqdm: + iterator = tqdm(ban_ids, desc="Recherche", unit="id", ncols=80) + else: + iterator = ban_ids + + for ban_id in iterator: + result = search_single_id(ban_id, index_set, df_dict) + results.append(result) + + return results + + +def search_by_ids_parallel(df, ban_ids, workers=None): + """Recherche parallèle des adresses avec barre de progression tqdm.""" + try: + from tqdm import tqdm + except ImportError: + print("Erreur: tqdm requis. Installation: pip install tqdm", file=sys.stderr) + sys.exit(1) + + if workers is None: + workers = min(multiprocessing.cpu_count(), 8) + + # Créer un dict avec la première occurrence de chaque id pour des recherches O(1) + print(f" Création de l'index de recherche...") + df_unique = df[~df.index.duplicated(keep='first')] + df_dict = df_unique.to_dict('index') + index_set = set(df_dict.keys()) + + # Pour de petits lots, pas besoin de parallélisation + if len(ban_ids) < 1000: + results = [] + for ban_id in tqdm(ban_ids, desc="Recherche", unit="id", ncols=80): + results.append(search_single_id(ban_id, index_set, df_dict)) + return results + + # Recherche séquentielle avec tqdm (plus efficace que multiprocessing pour ce cas) + # car le DataFrame est déjà en mémoire et les lookups sont O(1) + results = [] + for ban_id in tqdm(ban_ids, desc="Recherche", unit="id", ncols=80): + results.append(search_single_id(ban_id, index_set, df_dict)) + + return results + + +def print_results(results): + """Affiche les résultats de recherche.""" + for r in results: + if r.get('found'): + print(f"\n{r['id']} - TROUVÉ") + print(f" Adresse: {r['numero']}{r['rep']} {r['nom_voie']}") + print(f" Commune: {r['code_postal']} {r['nom_commune']} ({r['code_insee']})") + print(f" Coordonnées: {r['lon']}, {r['lat']}") + else: + print(f"\n{r['id']} - NON TROUVÉ") + + +def save_results_to_csv(results, output_file): + """Sauvegarde les résultats en CSV.""" + import csv + + with open(output_file, 'w', newline='', encoding='utf-8') as f: + fieldnames = ['id', 'found', 'numero', 'rep', 'nom_voie', 'code_postal', 'nom_commune', 'code_insee', 'lon', 'lat'] + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') + writer.writeheader() + writer.writerows(results) + + found_count = sum(1 for r in results if r.get('found')) + print(f"\nRésultats sauvegardés dans {output_file}") + print(f" - Total: {len(results)}") + print(f" - Trouvés: {found_count}") + print(f" - Non trouvés: {len(results) - found_count}") + + +def main(): + parser = argparse.ArgumentParser(description='Recherche d\'adresses BAN par identifiant') + parser.add_argument('--convert', metavar='CSV', help='Convertir un fichier CSV en Parquet') + parser.add_argument('--parquet', default='ban.parquet', help='Fichier Parquet à utiliser (défaut: ban.parquet)') + parser.add_argument('--search', nargs='+', metavar='ID', help='Identifiants BAN à rechercher') + parser.add_argument('--search-file', metavar='FILE', help='Fichier contenant les identifiants (un par ligne)') + parser.add_argument('--search-csv', metavar='CSV', help='Fichier CSV avec colonne identifiant_ban (ex: missing_dpe.csv)') + parser.add_argument('--ban-column', default='identifiant_ban', help='Nom de la colonne contenant les identifiants BAN (défaut: identifiant_ban)') + parser.add_argument('--output', metavar='CSV', help='Fichier CSV de sortie pour les résultats') + + args = parser.parse_args() + + # Mode conversion + if args.convert: + convert_csv_to_parquet(args.convert, args.parquet) + return + + # Mode recherche + if args.search or args.search_file or args.search_csv: + # Charger les identifiants à rechercher + ban_ids = [] + + if args.search: + ban_ids.extend(args.search) + + if args.search_file: + with open(args.search_file, 'r') as f: + ban_ids.extend(line.strip() for line in f if line.strip()) + + if args.search_csv: + import csv + with open(args.search_csv, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + if args.ban_column not in reader.fieldnames: + print(f"Erreur: colonne '{args.ban_column}' non trouvée dans {args.search_csv}", file=sys.stderr) + print(f"Colonnes disponibles: {', '.join(reader.fieldnames)}", file=sys.stderr) + sys.exit(1) + for row in reader: + ban_id = row.get(args.ban_column, '').strip() + if ban_id: + ban_ids.append(ban_id) + + if not ban_ids: + print("Erreur: aucun identifiant à rechercher", file=sys.stderr) + sys.exit(1) + + print(f"Recherche de {len(ban_ids)} identifiant(s)...") + + # Charger le Parquet + df = load_parquet(args.parquet) + + # Rechercher + results = search_by_ids(df, ban_ids) + + # Afficher ou sauvegarder + if args.output: + save_results_to_csv(results, args.output) + else: + print_results(results) + + # Log de fin avec taux de réussite + found_count = sum(1 for r in results if r.get('found')) + total = len(results) + not_found = total - found_count + success_rate = (found_count / total * 100) if total > 0 else 0 + + print(f"\n{'='*50}") + print(f"RÉSUMÉ DE LA RECHERCHE") + print(f"{'='*50}") + print(f" Total recherché : {total:,}") + print(f" Trouvés : {found_count:,} ({success_rate:.1f}%)") + print(f" Non trouvés : {not_found:,} ({100-success_rate:.1f}%)") + print(f"{'='*50}") + + return + + # Aucune action + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/server/src/scripts/import-dpe/dpe_raw_fields.py b/server/src/scripts/import-dpe/dpe_raw_fields.py index 56532836f..38334b056 100644 --- a/server/src/scripts/import-dpe/dpe_raw_fields.py +++ b/server/src/scripts/import-dpe/dpe_raw_fields.py @@ -224,6 +224,8 @@ "volume_stockage_generateur_n1_ecs_n1", "volume_stockage_generateur_n2_ecs_n1", "zone_climatique", + "id_rnb", + "provenance_id_rnb", ] # Mapping from JSON field names to database field names @@ -450,4 +452,6 @@ "volume_stockage_generateur_n1_ecs_n1": "volume_stockage_generateur_n1_ecs_n1", "volume_stockage_generateur_n2_ecs_n1": "volume_stockage_generateur_n2_ecs_n1", "zone_climatique": "zone_climatique", + "id_rnb": "id_rnb", + "provenance_id_rnb": "provenance_id_rnb", } diff --git a/server/src/scripts/import-dpe/extract_missing_dpe.py b/server/src/scripts/import-dpe/extract_missing_dpe.py new file mode 100644 index 000000000..e1a53e732 --- /dev/null +++ b/server/src/scripts/import-dpe/extract_missing_dpe.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +""" +Script d'extraction des DPE présents dans RAW mais absents de la production. + +Extrait en CSV la liste des logements dont l'identifiant_ban n'est pas présent +dans la table ban_addresses de production. + +Usage: + DATABASE_URL_RAW="postgres://..." DATABASE_URL_PROD="postgres://..." python extract_missing_dpe.py [OPTIONS] + +Options: + --output FILE Fichier CSV de sortie (défaut: missing_dpe.csv) + --year YEAR Filtrer par année (ex: 2025) + --limit N Limiter le nombre de résultats + --sample N Extraire un échantillon aléatoire de N lignes + +Variables d'environnement requises: + DATABASE_URL_RAW - Base contenant dpe_raw + DATABASE_URL_PROD - Base contenant ban_addresses +""" + +import os +import sys +import csv +import argparse +from datetime import datetime +import psycopg2 +from urllib.parse import urlparse + +def get_db_connection(database_url): + """Établit une connexion à la base de données.""" + parsed = urlparse(database_url) + return psycopg2.connect( + host=parsed.hostname, + port=parsed.port or 5432, + database=parsed.path[1:], + user=parsed.username, + password=parsed.password + ) + +def extract_missing_dpe(conn_raw, conn_prod, output_file, year=None, limit=None, sample=None): + """Extrait les DPE dont l'identifiant_ban n'est pas dans ban_addresses.""" + cursor_raw = conn_raw.cursor() + cursor_prod = conn_prod.cursor() + + print("Récupération des identifiants BAN de la production...") + + # Récupérer tous les ban_id de la production + cursor_prod.execute(""" + SELECT DISTINCT ban_id + FROM ban_addresses + WHERE address_kind = 'Housing' + AND ban_id IS NOT NULL + """) + prod_ban_ids = set(row[0] for row in cursor_prod.fetchall()) + print(f" - {len(prod_ban_ids):,} identifiants BAN en production") + + # Construire la requête pour les DPE RAW + where_clauses = ["identifiant_ban IS NOT NULL"] + params = [] + + if year: + where_clauses.append("EXTRACT(YEAR FROM date_etablissement_dpe) = %s") + params.append(year) + + where_sql = " AND ".join(where_clauses) + + order_sql = "ORDER BY RANDOM()" if sample else "ORDER BY date_etablissement_dpe DESC" + limit_sql = f"LIMIT {sample or limit}" if (sample or limit) else "" + + query = f""" + SELECT + dpe_id, + identifiant_ban, + date_etablissement_dpe, + date_reception_dpe, + adresse_ban, + code_postal_ban, + nom_commune_ban, + code_departement_ban, + etiquette_dpe, + etiquette_ges, + type_batiment, + surface_habitable_logement, + annee_construction + FROM dpe_raw + WHERE {where_sql} + {order_sql} + {limit_sql} + """ + + print(f"Récupération des DPE RAW{f' ({year})' if year else ''}...") + cursor_raw.execute(query, params) + + # Filtrer les DPE dont le ban_id n'est pas en production + missing_count = 0 + total_count = 0 + + with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + + # En-têtes + writer.writerow([ + 'identifiant_dpe', + 'identifiant_ban', + 'date_etablissement_dpe', + 'date_reception_dpe', + 'adresse_ban', + 'code_postal_ban', + 'nom_commune_ban', + 'code_departement_ban', + 'etiquette_dpe', + 'etiquette_ges', + 'type_batiment', + 'surface_habitable', + 'annee_construction' + ]) + + for row in cursor_raw: + total_count += 1 + identifiant_ban = row[1] + + if identifiant_ban not in prod_ban_ids: + writer.writerow(row) + missing_count += 1 + + if total_count % 100000 == 0: + print(f" - {total_count:,} DPE analysés, {missing_count:,} manquants...") + + print() + print(f"Extraction terminée:") + print(f" - DPE analysés: {total_count:,}") + print(f" - DPE manquants (BAN non trouvé): {missing_count:,}") + print(f" - Taux de matching: {(total_count - missing_count) * 100.0 / total_count:.1f}%") + print(f" - Fichier généré: {output_file}") + + cursor_raw.close() + cursor_prod.close() + + return missing_count + +def main(): + parser = argparse.ArgumentParser(description='Extraction des DPE manquants en production') + parser.add_argument('--output', default='missing_dpe.csv', help='Fichier CSV de sortie') + parser.add_argument('--year', type=int, help='Filtrer par année (ex: 2025)') + parser.add_argument('--limit', type=int, help='Limiter le nombre de résultats') + parser.add_argument('--sample', type=int, help='Extraire un échantillon aléatoire') + args = parser.parse_args() + + # Vérifier les variables d'environnement + url_raw = os.environ.get('DATABASE_URL_RAW') + url_prod = os.environ.get('DATABASE_URL_PROD') + + if not url_raw: + print("Erreur: DATABASE_URL_RAW non définie", file=sys.stderr) + sys.exit(1) + + if not url_prod: + print("Erreur: DATABASE_URL_PROD non définie", file=sys.stderr) + sys.exit(1) + + try: + print("Connexion aux bases de données...") + conn_raw = get_db_connection(url_raw) + conn_prod = get_db_connection(url_prod) + print() + + extract_missing_dpe( + conn_raw, + conn_prod, + args.output, + year=args.year, + limit=args.limit, + sample=args.sample + ) + + conn_raw.close() + conn_prod.close() + except Exception as e: + print(f"Erreur: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/server/src/scripts/import-dpe/import-ademe.py b/server/src/scripts/import-dpe/import-ademe.py index fa8c419e2..c1081f81d 100644 --- a/server/src/scripts/import-dpe/import-ademe.py +++ b/server/src/scripts/import-dpe/import-ademe.py @@ -45,7 +45,7 @@ def get_last_record_from_file(self, filename: str) -> Optional[Dict[str, Any]]: try: if not os.path.exists(filename): return None - + with open(filename, 'rb') as f: # Go to end of file f.seek(-2, os.SEEK_END) @@ -56,10 +56,10 @@ def get_last_record_from_file(self, filename: str) -> Optional[Dict[str, Any]]: # Read the last line last_line = f.readline().decode('utf-8').strip() - + if last_line: return json.loads(last_line) - + except Exception as e: print(f"Error reading last record: {e}") @@ -81,9 +81,9 @@ def build_resume_url(self, last_record: Dict[str, Any], limit_per_page: int = 10 # Build 'after' parameter: _i + "%2C" + _rand after_param = f"{_i}%2C{_rand}" - + return f"{self.base_url}?size={limit_per_page}&after={after_param}" - + def count_lines_in_file(self, filename: str) -> int: """ Counts the number of lines in a file @@ -103,7 +103,7 @@ def count_lines_in_file(self, filename: str) -> int: except Exception as e: print(f"Error counting lines: {e}") return 0 - + def test_authentication(self) -> bool: """ Tests authentication with x-apiKey @@ -112,7 +112,7 @@ def test_authentication(self) -> bool: True if authentication succeeds, False otherwise """ test_url = f"{self.base_url}?size=1" - + print(f"🧪 Testing authentication with: {dict(self.session.headers)}") try: @@ -134,7 +134,7 @@ def test_authentication(self) -> bool: except Exception as e: print(f"❌ Error during test: {e}") return False - + def get_page(self, url: str, max_retries: int = 5) -> Optional[tuple[Dict[str, Any], int]]: """ Retrieves a page of data from the API with retry logic @@ -204,7 +204,7 @@ def get_page(self, url: str, max_retries: int = 5) -> Optional[tuple[Dict[str, A return None return None - + def fetch_all_data(self, limit_per_page: int = 1000, max_pages: Optional[int] = None, output_dir: str = "dpe_split", resume_from: Optional[Dict[str, Any]] = None) -> int: """ @@ -362,7 +362,7 @@ def fetch_all_data(self, limit_per_page: int = 1000, max_pages: Optional[int] = print(f" {year_month_key}: {records_by_month[year_month_key]:,} DPE") return new_records - + def save_to_json(self, data: List[Dict[str, Any]], filename: str = "dpe_data.json"): """ Saves data to a standard JSON file @@ -377,7 +377,7 @@ def save_to_json(self, data: List[Dict[str, Any]], filename: str = "dpe_data.jso print(f"Data saved to {filename}") except Exception as e: print(f"Error during save: {e}") - + def read_jsonl_file(self, filename: str) -> List[Dict[str, Any]]: """ Reads a JSON Lines file and returns a list of objects @@ -400,7 +400,7 @@ def read_jsonl_file(self, filename: str) -> List[Dict[str, Any]]: except Exception as e: print(f"Error reading file: {e}") return [] - + def get_sample_data(self, sample_size: int = 100) -> List[Dict[str, Any]]: """ Retrieves a data sample for testing @@ -413,7 +413,7 @@ def get_sample_data(self, sample_size: int = 100) -> List[Dict[str, Any]]: """ url = f"{self.base_url}?size={sample_size}" page_data = self.get_page(url) - + if page_data and "results" in page_data: return page_data["results"] return [] @@ -493,4 +493,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/server/src/scripts/import-dpe/import-dpe.py b/server/src/scripts/import-dpe/import-dpe.py index 843f91fd8..bff325452 100644 --- a/server/src/scripts/import-dpe/import-dpe.py +++ b/server/src/scripts/import-dpe/import-dpe.py @@ -171,8 +171,10 @@ def _setup_logger(self) -> logging.Logger: for handler in logger.handlers[:]: logger.removeHandler(handler) - # File handler with timestamp - log_filename = f'dpe_processing_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + # File handler with timestamp in logs/ directory + log_dir = Path(__file__).parent / 'logs' + log_dir.mkdir(exist_ok=True) + log_filename = log_dir / f'dpe_processing_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' file_handler = logging.FileHandler(log_filename, encoding='utf-8') file_handler.setLevel(logging.INFO) @@ -797,16 +799,14 @@ def _update_stats(self, **kwargs): self.stats[key] += value if isinstance(value, (int, float)) else 0 def _batch_get_buildings_by_rnb_ids(self, cursor, rnb_ids: List[str]) -> Dict[str, Dict]: - """Fetch only buildings without DPE or that need update""" + """Fetch all buildings matching RNB IDs (including those with existing DPE)""" if not rnb_ids: return {} - # OPTIMIZATION: Load only buildings without imported DPE yet - # This enables script resume without reprocessing already done work + # Fetch all matching buildings to allow updating with more recent DPE query = """ SELECT * FROM buildings WHERE rnb_id = ANY(%s) - AND (dpe_id IS NULL OR dpe_import_match IS NULL) """ cursor.execute(query, (rnb_ids,)) results = cursor.fetchall() @@ -814,11 +814,11 @@ def _batch_get_buildings_by_rnb_ids(self, cursor, rnb_ids: List[str]) -> Dict[st return {row['rnb_id']: dict(row) for row in results} def _batch_get_buildings_by_ban_ids(self, cursor, ban_ids: List[str]) -> Dict[str, Dict]: - """Fetch only buildings without DPE via ban_ids""" + """Fetch all buildings matching BAN IDs (including those with existing DPE)""" if not ban_ids: return {} - # OPTIMIZATION: Load only buildings without imported DPE yet + # Fetch all matching buildings to allow updating with more recent DPE query = """ SELECT DISTINCT ON (ba.ban_id) ba.ban_id, b.* FROM buildings b @@ -826,7 +826,6 @@ def _batch_get_buildings_by_ban_ids(self, cursor, ban_ids: List[str]) -> Dict[st JOIN ban_addresses ba ON fh.id = ba.ref_id WHERE ba.address_kind = 'Housing' AND ba.ban_id = ANY(%s) - AND (b.dpe_id IS NULL OR b.dpe_import_match IS NULL) """ cursor.execute(query, (ban_ids,)) results = cursor.fetchall() @@ -990,7 +989,7 @@ def _determine_dpe_priority(self, methode_application_dpe: str) -> int: else: return 2 - def _should_import_dpe(self, dpe_data: Dict, existing_dpe: Optional[Dict]) -> tuple[bool, str]: + def _should_import_dpe(self, dpe_data: Dict, existing_dpe_id: Optional[str], building: Optional[Dict] = None) -> tuple[bool, str]: """ Determine if DPE should be imported according to business rules @@ -1006,10 +1005,26 @@ def _should_import_dpe(self, dpe_data: Dict, existing_dpe: Optional[Dict]) -> tu # Check if it's an apartment DPE is_apartment_dpe = 'dpe appartement individuel' in methode - if existing_dpe: - # A DPE already exists + if existing_dpe_id and building: + # A DPE already exists - check if new one is more recent + existing_date = building.get('dpe_date_at') + new_date_str = dpe_data.get('date_etablissement_dpe') + + if existing_date and new_date_str: + try: + new_date = datetime.strptime(new_date_str, '%Y-%m-%d').date() + # Convert existing_date to date if it's a datetime + if hasattr(existing_date, 'date'): + existing_date = existing_date.date() + + # Only import if new DPE is more recent + if new_date <= existing_date: + return False, "skip_older_dpe" + except (ValueError, TypeError): + pass # If date parsing fails, continue with other checks + if is_building_dpe: - return True, "building_dpe_exists" + return True, "building_dpe_update" else: return False, "skip_non_building_dpe" else: @@ -1392,7 +1407,7 @@ def _prepare_dpe_update(self, dpe_data: Dict, building: Dict, case_type: str) -> """ # Check business rules existing_dpe_id = building.get('dpe_id') - should_import, reason = self._should_import_dpe(dpe_data, existing_dpe_id) + should_import, reason = self._should_import_dpe(dpe_data, existing_dpe_id, building) if not should_import: return None diff --git a/server/src/scripts/import-dpe/import_dpe_raw.py b/server/src/scripts/import-dpe/import_dpe_raw.py index 768a89521..f8c62fefd 100644 --- a/server/src/scripts/import-dpe/import_dpe_raw.py +++ b/server/src/scripts/import-dpe/import_dpe_raw.py @@ -127,8 +127,10 @@ def _setup_logger(self) -> logging.Logger: for handler in logger.handlers[:]: logger.removeHandler(handler) - # File handler with timestamp - log_filename = f'dpe_raw_import_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + # File handler with timestamp in logs/ directory + log_dir = Path(__file__).parent / 'logs' + log_dir.mkdir(exist_ok=True) + log_filename = log_dir / f'dpe_raw_import_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' file_handler = logging.FileHandler(log_filename, encoding='utf-8') file_handler.setLevel(logging.INFO) diff --git a/server/src/scripts/import-dpe/test_import_dpe.py b/server/src/scripts/import-dpe/test_import_dpe.py index 693ea6631..dcb534e30 100644 --- a/server/src/scripts/import-dpe/test_import_dpe.py +++ b/server/src/scripts/import-dpe/test_import_dpe.py @@ -115,7 +115,7 @@ def processor(self): def test_new_building_dpe_no_existing(self, processor): """New building DPE with no existing DPE should be imported.""" dpe_data = {'methode_application_dpe': 'DPE immeuble collectif'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=None) + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id=None) assert should_import is True assert case == "new_building_dpe" @@ -123,7 +123,7 @@ def test_new_building_dpe_no_existing(self, processor): def test_new_house_dpe_no_existing(self, processor): """New house DPE with no existing DPE should be imported.""" dpe_data = {'methode_application_dpe': 'DPE maison individuelle'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=None) + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id=None) assert should_import is True assert case == "new_building_dpe" @@ -131,7 +131,7 @@ def test_new_house_dpe_no_existing(self, processor): def test_apartment_dpe_no_existing(self, processor): """Apartment DPE with no existing DPE should be imported.""" dpe_data = {'methode_application_dpe': 'DPE appartement individuel'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=None) + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id=None) assert should_import is True assert case == "apartment_dpe" @@ -139,34 +139,46 @@ def test_apartment_dpe_no_existing(self, processor): def test_other_dpe_no_existing(self, processor): """Other DPE types with no existing DPE should be skipped.""" dpe_data = {'methode_application_dpe': 'Autre méthode'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=None) + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id=None) assert should_import is False assert case == "skip_other_dpe" def test_building_dpe_replaces_existing(self, processor): - """Building DPE should replace any existing DPE.""" - dpe_data = {'methode_application_dpe': 'DPE immeuble collectif'} - existing_dpe = {'dpe_id': 'old_dpe', 'class_dpe': 'C'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=existing_dpe) + """Building DPE should replace any existing DPE when more recent.""" + from datetime import date + dpe_data = { + 'methode_application_dpe': 'DPE immeuble collectif', + 'date_etablissement_dpe': '2024-06-01' + } + building = {'dpe_date_at': date(2024, 1, 1)} + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id='old_dpe', building=building) assert should_import is True - assert case == "building_dpe_exists" + assert case == "building_dpe_update" def test_apartment_dpe_with_existing_skipped(self, processor): """Apartment DPE should not replace existing DPE.""" - dpe_data = {'methode_application_dpe': 'DPE appartement individuel'} - existing_dpe = {'dpe_id': 'existing_dpe', 'class_dpe': 'B'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=existing_dpe) + from datetime import date + dpe_data = { + 'methode_application_dpe': 'DPE appartement individuel', + 'date_etablissement_dpe': '2024-06-01' + } + building = {'dpe_date_at': date(2024, 1, 1)} + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id='existing_dpe', building=building) assert should_import is False assert case == "skip_non_building_dpe" def test_other_dpe_with_existing_skipped(self, processor): """Other DPE should not replace existing DPE.""" - dpe_data = {'methode_application_dpe': 'Autre méthode'} - existing_dpe = {'dpe_id': 'existing_dpe', 'class_dpe': 'A'} - should_import, case = processor._should_import_dpe(dpe_data, existing_dpe=existing_dpe) + from datetime import date + dpe_data = { + 'methode_application_dpe': 'Autre méthode', + 'date_etablissement_dpe': '2024-06-01' + } + building = {'dpe_date_at': date(2024, 1, 1)} + should_import, case = processor._should_import_dpe(dpe_data, existing_dpe_id='existing_dpe', building=building) assert should_import is False assert case == "skip_non_building_dpe" @@ -296,31 +308,33 @@ def test_building_dpe_always_wins(self, processor): building_dpe = {'methode_application_dpe': 'DPE immeuble collectif'} # Test against no existing DPE - should_import, _ = processor._should_import_dpe(building_dpe, None) + should_import, _ = processor._should_import_dpe(building_dpe, existing_dpe_id=None) assert should_import is True # Test against existing apartment DPE - existing_apt = {'dpe_id': 'apt_dpe', 'methode_application_dpe': 'DPE appartement individuel'} - should_import, _ = processor._should_import_dpe(building_dpe, existing_apt) + should_import, _ = processor._should_import_dpe(building_dpe, existing_dpe_id='apt_dpe') assert should_import is True # Test against existing building DPE - existing_building = {'dpe_id': 'building_dpe', 'methode_application_dpe': 'DPE immeuble collectif'} - should_import, _ = processor._should_import_dpe(building_dpe, existing_building) + should_import, _ = processor._should_import_dpe(building_dpe, existing_dpe_id='building_dpe') assert should_import is True def test_apartment_dpe_priority_logic(self, processor): """Apartment DPE should only be imported when no DPE exists.""" - apartment_dpe = {'methode_application_dpe': 'DPE appartement individuel'} + from datetime import date + apartment_dpe = { + 'methode_application_dpe': 'DPE appartement individuel', + 'date_etablissement_dpe': '2024-06-01' + } # Should import when no DPE exists - should_import, case = processor._should_import_dpe(apartment_dpe, None) + should_import, case = processor._should_import_dpe(apartment_dpe, existing_dpe_id=None) assert should_import is True assert case == "apartment_dpe" - # Should NOT import when any DPE already exists - existing = {'dpe_id': 'any_dpe'} - should_import, case = processor._should_import_dpe(apartment_dpe, existing) + # Should NOT import when a DPE already exists (with building info for date check) + building = {'dpe_date_at': date(2024, 1, 1)} + should_import, case = processor._should_import_dpe(apartment_dpe, existing_dpe_id='any_dpe', building=building) assert should_import is False assert case == "skip_non_building_dpe"