Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions slurm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ These rules have hard-coded `resources:` overrides and should not be reduced wit
| `untyped_chemical_compendia` | `chemical.snakefile` | 512G | — | Pre-typing step |
| `gene_compendia` | `gene.snakefile` | 256G | 6h | Gene graph |
| `export_compendia_to_duckdb` | `duckdb.snakefile` | 512G | 6h | Per-compendium DuckDB export |
| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_duplicate_curies` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) |
| `check_for_duplicate_curies` | `duckdb.snakefile` | 256G | — | Cross-compendium join (two-pass) |
| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) |
| `chembl_labels_and_smiles` | `datacollect.snakefile` | 128G | — | RDF parse |
| `chemical_unichem_concordia` | `chemical.snakefile` | 128G | — | UniChem merge |
| `generate_pubmed_concords` | `publications.snakefile` | 128G | 24h | Full PubMed parse |
Expand Down
92 changes: 57 additions & 35 deletions src/reports/duckdb_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@ def check_for_identically_labeled_cliques(parquet_root, duckdb_filename, identic
db = setup_duckdb(duckdb_filename, duckdb_config)
cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True)

results = db.sql("""
SELECT
LOWER(preferred_name) AS preferred_name_lc,
LIST(clique_leader ORDER BY clique_leader ASC) AS clique_leaders,
COUNT(clique_leader) AS clique_leader_count
# Pass 1: identify names shared by more than one clique using only a COUNT — fully spill-able.
db.execute("""
CREATE OR REPLACE TEMP TABLE dup_names AS
SELECT LOWER(preferred_name) AS preferred_name_lc, COUNT(*) AS clique_leader_count
FROM cliques
WHERE preferred_name <> '' AND preferred_name <> '""'
GROUP BY preferred_name_lc HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
""")
results.write_csv(identically_labeled_cliques_tsv, sep="\t")

# Pass 2: collect LIST() only for the confirmed duplicate names (small join target).
db.sql("""
SELECT d.preferred_name_lc,
LIST(c.clique_leader ORDER BY c.clique_leader ASC) AS clique_leaders,
d.clique_leader_count
FROM cliques c
JOIN dup_names d ON LOWER(c.preferred_name) = d.preferred_name_lc
WHERE c.preferred_name <> '' AND c.preferred_name <> '""'
GROUP BY d.preferred_name_lc, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(identically_labeled_cliques_tsv, sep="\t")

cliques.close()

Expand All @@ -49,19 +58,27 @@ def check_for_duplicate_curies(parquet_root, duckdb_filename, duplicate_curies_t
db = setup_duckdb(duckdb_filename, duckdb_config)
edges = db.read_parquet(os.path.join(parquet_root, "**/Edge.parquet"), hive_partitioning=True)

# Look for CURIEs that are present in different cliques.
db.sql("""SELECT
curie,
LIST(clique_leader) AS clique_leaders,
LIST(filename) AS filenames,
LIST(conflation) AS conflations,
COUNT(clique_leader) AS clique_leader_count
FROM
edges
WHERE
edges.conflation = 'None'
# Pass 1: identify duplicate CURIEs using only a COUNT — fully spill-able.
db.execute("""
CREATE OR REPLACE TEMP TABLE dup_curies AS
SELECT curie, COUNT(*) AS clique_leader_count
FROM edges
WHERE conflation = 'None'
GROUP BY curie HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
""")

# Pass 2: collect LIST() only for the confirmed duplicate CURIEs (small join target).
db.sql("""
SELECT e.curie,
LIST(e.clique_leader ORDER BY e.clique_leader) AS clique_leaders,
LIST(e.filename ORDER BY e.clique_leader) AS filenames,
LIST(e.conflation ORDER BY e.clique_leader) AS conflations,
d.clique_leader_count
FROM edges e
JOIN dup_curies d USING (curie)
WHERE e.conflation = 'None'
GROUP BY e.curie, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(duplicate_curies_tsv, sep="\t")

edges.close()
Expand All @@ -80,23 +97,28 @@ def check_for_duplicate_clique_leaders(parquet_root, duckdb_filename, duplicate_
db = setup_duckdb(duckdb_filename, duckdb_config)
cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True)

# Look for duplicate clique leaders.
# We would love to include the following columns, but they take up too much memory:
# - LIST(clique_identifier_count) AS clique_identifier_counts,
# - LIST(biolink_type) AS biolink_types
results = db.sql(
"""
SELECT
clique_leader,
LIST(filename) AS filenames,
COUNT(clique_leader) AS clique_leader_count
FROM
cliques
# Pass 1: identify duplicate clique leaders using only a COUNT — fully spill-able.
db.execute("""
CREATE OR REPLACE TEMP TABLE dup_leaders AS
SELECT clique_leader, COUNT(*) AS clique_leader_count
FROM cliques
GROUP BY clique_leader HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
"""
)
results.write_csv(duplicate_clique_leaders_tsv, sep="\t")
""")

# Pass 2: collect LIST() only for the confirmed duplicates (small join target).
# The two-pass approach also lets us restore biolink_type and clique_identifier_count,
# which were previously dropped due to memory pressure.
db.sql("""
SELECT d.clique_leader,
LIST(c.filename ORDER BY c.filename) AS filenames,
LIST(c.biolink_type ORDER BY c.filename) AS biolink_types,
LIST(c.clique_identifier_count ORDER BY c.filename) AS clique_identifier_counts,
d.clique_leader_count
FROM cliques c
JOIN dup_leaders d USING (clique_leader)
GROUP BY d.clique_leader, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(duplicate_clique_leaders_tsv, sep="\t")

cliques.close()

Expand Down
21 changes: 12 additions & 9 deletions src/snakefiles/duckdb.snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ rule check_for_identically_labeled_cliques:
benchmark:
config["output_directory"] + "/benchmarks/check_for_identically_labeled_cliques.tsv"
resources:
mem="1500G",
mem="128G",
cpus_per_task=4,
params:
parquet_dir=config["output_directory"] + "/duckdb/parquet/",
run:
Expand All @@ -111,8 +112,8 @@ rule check_for_identically_labeled_cliques:
output.duckdb_filename,
output.identically_labeled_cliques_tsv,
{
"memory_limit": "512G",
"threads": 2,
"memory_limit": "64G",
"threads": 4,
"preserve_insertion_order": False,
},
)
Expand All @@ -128,7 +129,8 @@ rule check_for_duplicate_curies:
benchmark:
config["output_directory"] + "/benchmarks/check_for_duplicate_curies.tsv"
resources:
mem="1500G",
mem="256G",
cpus_per_task=4,
params:
parquet_dir=config["output_directory"] + "/duckdb/parquet/",
run:
Expand All @@ -137,8 +139,8 @@ rule check_for_duplicate_curies:
output.duckdb_filename,
output.duplicate_curies,
{
"memory_limit": "1500G",
"threads": 1,
"memory_limit": "128G",
"threads": 4,
"preserve_insertion_order": False,
},
)
Expand All @@ -154,7 +156,8 @@ rule check_for_duplicate_clique_leaders:
benchmark:
config["output_directory"] + "/benchmarks/check_for_duplicate_clique_leaders.tsv"
resources:
mem="1500G",
mem="128G",
cpus_per_task=4,
params:
parquet_dir=config["output_directory"] + "/duckdb/parquet/",
run:
Expand All @@ -163,8 +166,8 @@ rule check_for_duplicate_clique_leaders:
output.duckdb_filename,
output.duplicate_clique_leaders_tsv,
{
"memory_limit": "512G",
"threads": 2,
"memory_limit": "64G",
"threads": 4,
"preserve_insertion_order": False,
},
)
Expand Down
Loading