diff --git a/slurm/README.md b/slurm/README.md index bacb49ca..96708f75 100644 --- a/slurm/README.md +++ b/slurm/README.md @@ -102,9 +102,9 @@ These rules have hard-coded `resources:` overrides and should not be reduced wit | `untyped_chemical_compendia` | `chemical.snakefile` | 512G | — | Pre-typing step | | `gene_compendia` | `gene.snakefile` | 256G | 6h | Gene graph | | `export_compendia_to_duckdb` | `duckdb.snakefile` | 512G | 6h | Per-compendium DuckDB export | -| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 1500G | — | Cross-compendium join | -| `check_for_duplicate_curies` | `duckdb.snakefile` | 1500G | — | Cross-compendium join | -| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 1500G | — | Cross-compendium join | +| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) | +| `check_for_duplicate_curies` | `duckdb.snakefile` | 256G | — | Cross-compendium join (two-pass) | +| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) | | `chembl_labels_and_smiles` | `datacollect.snakefile` | 128G | — | RDF parse | | `chemical_unichem_concordia` | `chemical.snakefile` | 128G | — | UniChem merge | | `generate_pubmed_concords` | `publications.snakefile` | 128G | 24h | Full PubMed parse | diff --git a/src/reports/duckdb_reports.py b/src/reports/duckdb_reports.py index 156927aa..bf438c03 100644 --- a/src/reports/duckdb_reports.py +++ b/src/reports/duckdb_reports.py @@ -21,17 +21,26 @@ def check_for_identically_labeled_cliques(parquet_root, duckdb_filename, identic db = setup_duckdb(duckdb_filename, duckdb_config) cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True) - results = db.sql(""" - SELECT - LOWER(preferred_name) AS preferred_name_lc, - LIST(clique_leader ORDER BY clique_leader ASC) AS clique_leaders, - COUNT(clique_leader) AS clique_leader_count + # Pass 1: identify names shared by more than one clique using only a COUNT — fully spill-able. + db.execute(""" + CREATE OR REPLACE TEMP TABLE dup_names AS + SELECT LOWER(preferred_name) AS preferred_name_lc, COUNT(*) AS clique_leader_count FROM cliques WHERE preferred_name <> '' AND preferred_name <> '""' GROUP BY preferred_name_lc HAVING clique_leader_count > 1 - ORDER BY clique_leader_count DESC """) - results.write_csv(identically_labeled_cliques_tsv, sep="\t") + + # Pass 2: collect LIST() only for the confirmed duplicate names (small join target). + db.sql(""" + SELECT d.preferred_name_lc, + LIST(c.clique_leader ORDER BY c.clique_leader ASC) AS clique_leaders, + d.clique_leader_count + FROM cliques c + JOIN dup_names d ON LOWER(c.preferred_name) = d.preferred_name_lc + WHERE c.preferred_name <> '' AND c.preferred_name <> '""' + GROUP BY d.preferred_name_lc, d.clique_leader_count + ORDER BY d.clique_leader_count DESC + """).write_csv(identically_labeled_cliques_tsv, sep="\t") cliques.close() @@ -49,19 +58,27 @@ def check_for_duplicate_curies(parquet_root, duckdb_filename, duplicate_curies_t db = setup_duckdb(duckdb_filename, duckdb_config) edges = db.read_parquet(os.path.join(parquet_root, "**/Edge.parquet"), hive_partitioning=True) - # Look for CURIEs that are present in different cliques. - db.sql("""SELECT - curie, - LIST(clique_leader) AS clique_leaders, - LIST(filename) AS filenames, - LIST(conflation) AS conflations, - COUNT(clique_leader) AS clique_leader_count - FROM - edges - WHERE - edges.conflation = 'None' + # Pass 1: identify duplicate CURIEs using only a COUNT — fully spill-able. + db.execute(""" + CREATE OR REPLACE TEMP TABLE dup_curies AS + SELECT curie, COUNT(*) AS clique_leader_count + FROM edges + WHERE conflation = 'None' GROUP BY curie HAVING clique_leader_count > 1 - ORDER BY clique_leader_count DESC + """) + + # Pass 2: collect LIST() only for the confirmed duplicate CURIEs (small join target). + db.sql(""" + SELECT e.curie, + LIST(e.clique_leader ORDER BY e.clique_leader) AS clique_leaders, + LIST(e.filename ORDER BY e.clique_leader) AS filenames, + LIST(e.conflation ORDER BY e.clique_leader) AS conflations, + d.clique_leader_count + FROM edges e + JOIN dup_curies d USING (curie) + WHERE e.conflation = 'None' + GROUP BY e.curie, d.clique_leader_count + ORDER BY d.clique_leader_count DESC """).write_csv(duplicate_curies_tsv, sep="\t") edges.close() @@ -80,23 +97,28 @@ def check_for_duplicate_clique_leaders(parquet_root, duckdb_filename, duplicate_ db = setup_duckdb(duckdb_filename, duckdb_config) cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True) - # Look for duplicate clique leaders. - # We would love to include the following columns, but they take up too much memory: - # - LIST(clique_identifier_count) AS clique_identifier_counts, - # - LIST(biolink_type) AS biolink_types - results = db.sql( - """ - SELECT - clique_leader, - LIST(filename) AS filenames, - COUNT(clique_leader) AS clique_leader_count - FROM - cliques + # Pass 1: identify duplicate clique leaders using only a COUNT — fully spill-able. + db.execute(""" + CREATE OR REPLACE TEMP TABLE dup_leaders AS + SELECT clique_leader, COUNT(*) AS clique_leader_count + FROM cliques GROUP BY clique_leader HAVING clique_leader_count > 1 - ORDER BY clique_leader_count DESC - """ - ) - results.write_csv(duplicate_clique_leaders_tsv, sep="\t") + """) + + # Pass 2: collect LIST() only for the confirmed duplicates (small join target). + # The two-pass approach also lets us restore biolink_type and clique_identifier_count, + # which were previously dropped due to memory pressure. + db.sql(""" + SELECT d.clique_leader, + LIST(c.filename ORDER BY c.filename) AS filenames, + LIST(c.biolink_type ORDER BY c.filename) AS biolink_types, + LIST(c.clique_identifier_count ORDER BY c.filename) AS clique_identifier_counts, + d.clique_leader_count + FROM cliques c + JOIN dup_leaders d USING (clique_leader) + GROUP BY d.clique_leader, d.clique_leader_count + ORDER BY d.clique_leader_count DESC + """).write_csv(duplicate_clique_leaders_tsv, sep="\t") cliques.close() diff --git a/src/snakefiles/duckdb.snakefile b/src/snakefiles/duckdb.snakefile index 2e1c861a..d7dd364c 100644 --- a/src/snakefiles/duckdb.snakefile +++ b/src/snakefiles/duckdb.snakefile @@ -102,7 +102,8 @@ rule check_for_identically_labeled_cliques: benchmark: config["output_directory"] + "/benchmarks/check_for_identically_labeled_cliques.tsv" resources: - mem="1500G", + mem="128G", + cpus_per_task=4, params: parquet_dir=config["output_directory"] + "/duckdb/parquet/", run: @@ -111,8 +112,8 @@ rule check_for_identically_labeled_cliques: output.duckdb_filename, output.identically_labeled_cliques_tsv, { - "memory_limit": "512G", - "threads": 2, + "memory_limit": "64G", + "threads": 4, "preserve_insertion_order": False, }, ) @@ -128,7 +129,8 @@ rule check_for_duplicate_curies: benchmark: config["output_directory"] + "/benchmarks/check_for_duplicate_curies.tsv" resources: - mem="1500G", + mem="256G", + cpus_per_task=4, params: parquet_dir=config["output_directory"] + "/duckdb/parquet/", run: @@ -137,8 +139,8 @@ rule check_for_duplicate_curies: output.duckdb_filename, output.duplicate_curies, { - "memory_limit": "1500G", - "threads": 1, + "memory_limit": "128G", + "threads": 4, "preserve_insertion_order": False, }, ) @@ -154,7 +156,8 @@ rule check_for_duplicate_clique_leaders: benchmark: config["output_directory"] + "/benchmarks/check_for_duplicate_clique_leaders.tsv" resources: - mem="1500G", + mem="128G", + cpus_per_task=4, params: parquet_dir=config["output_directory"] + "/duckdb/parquet/", run: @@ -163,8 +166,8 @@ rule check_for_duplicate_clique_leaders: output.duckdb_filename, output.duplicate_clique_leaders_tsv, { - "memory_limit": "512G", - "threads": 2, + "memory_limit": "64G", + "threads": 4, "preserve_insertion_order": False, }, )