Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions slurm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ These rules have hard-coded `resources:` overrides and should not be reduced wit
| `untyped_chemical_compendia` | `chemical.snakefile` | 512G | — | Pre-typing step |
| `gene_compendia` | `gene.snakefile` | 256G | 6h | Gene graph |
| `export_compendia_to_duckdb` | `duckdb.snakefile` | 512G | 6h | Per-compendium DuckDB export |
| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_duplicate_curies` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 1500G | — | Cross-compendium join |
| `check_for_identically_labeled_cliques` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) |
| `check_for_duplicate_curies` | `duckdb.snakefile` | 256G | — | Cross-compendium join (two-pass) |
| `check_for_duplicate_clique_leaders` | `duckdb.snakefile` | 128G | — | Cross-compendium join (two-pass) |
| `chembl_labels_and_smiles` | `datacollect.snakefile` | 128G | — | RDF parse |
| `chemical_unichem_concordia` | `chemical.snakefile` | 128G | — | UniChem merge |
| `generate_pubmed_concords` | `publications.snakefile` | 128G | 24h | Full PubMed parse |
Expand Down
92 changes: 57 additions & 35 deletions src/reports/duckdb_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@ def check_for_identically_labeled_cliques(parquet_root, duckdb_filename, identic
db = setup_duckdb(duckdb_filename, duckdb_config)
cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True)

results = db.sql("""
SELECT
LOWER(preferred_name) AS preferred_name_lc,
LIST(clique_leader ORDER BY clique_leader ASC) AS clique_leaders,
COUNT(clique_leader) AS clique_leader_count
# Pass 1: identify names shared by more than one clique using only a COUNT — fully spill-able.
db.execute("""
CREATE TEMP TABLE dup_names AS
SELECT LOWER(preferred_name) AS preferred_name_lc, COUNT(*) AS clique_leader_count
FROM cliques
WHERE preferred_name <> '' AND preferred_name <> '""'
GROUP BY preferred_name_lc HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
""")
results.write_csv(identically_labeled_cliques_tsv, sep="\t")

# Pass 2: collect LIST() only for the confirmed duplicate names (small join target).
db.sql("""
SELECT d.preferred_name_lc,
LIST(c.clique_leader ORDER BY c.clique_leader ASC) AS clique_leaders,
d.clique_leader_count
FROM cliques c
JOIN dup_names d ON LOWER(c.preferred_name) = d.preferred_name_lc
WHERE c.preferred_name <> '' AND c.preferred_name <> '""'
GROUP BY d.preferred_name_lc, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(identically_labeled_cliques_tsv, sep="\t")

cliques.close()

Expand All @@ -49,19 +58,27 @@ def check_for_duplicate_curies(parquet_root, duckdb_filename, duplicate_curies_t
db = setup_duckdb(duckdb_filename, duckdb_config)
edges = db.read_parquet(os.path.join(parquet_root, "**/Edge.parquet"), hive_partitioning=True)

# Look for CURIEs that are present in different cliques.
db.sql("""SELECT
curie,
LIST(clique_leader) AS clique_leaders,
LIST(filename) AS filenames,
LIST(conflation) AS conflations,
COUNT(clique_leader) AS clique_leader_count
FROM
edges
WHERE
edges.conflation = 'None'
# Pass 1: identify duplicate CURIEs using only a COUNT — fully spill-able.
db.execute("""
CREATE TEMP TABLE dup_curies AS
SELECT curie, COUNT(*) AS clique_leader_count
FROM edges
WHERE conflation = 'None'
GROUP BY curie HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
""")

# Pass 2: collect LIST() only for the confirmed duplicate CURIEs (small join target).
db.sql("""
SELECT e.curie,
LIST(e.clique_leader ORDER BY e.clique_leader) AS clique_leaders,
LIST(e.filename) AS filenames,
LIST(e.conflation) AS conflations,
d.clique_leader_count
FROM edges e
JOIN dup_curies d USING (curie)
WHERE e.conflation = 'None'
GROUP BY e.curie, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(duplicate_curies_tsv, sep="\t")

edges.close()
Expand All @@ -80,23 +97,28 @@ def check_for_duplicate_clique_leaders(parquet_root, duckdb_filename, duplicate_
db = setup_duckdb(duckdb_filename, duckdb_config)
cliques = db.read_parquet(os.path.join(parquet_root, "**/Clique.parquet"), hive_partitioning=True)

# Look for duplicate clique leaders.
# We would love to include the following columns, but they take up too much memory:
# - LIST(clique_identifier_count) AS clique_identifier_counts,
# - LIST(biolink_type) AS biolink_types
results = db.sql(
"""
SELECT
clique_leader,
LIST(filename) AS filenames,
COUNT(clique_leader) AS clique_leader_count
FROM
cliques
# Pass 1: identify duplicate clique leaders using only a COUNT — fully spill-able.
db.execute("""
CREATE TEMP TABLE dup_leaders AS
SELECT clique_leader, COUNT(*) AS clique_leader_count
FROM cliques
GROUP BY clique_leader HAVING clique_leader_count > 1
ORDER BY clique_leader_count DESC
"""
)
results.write_csv(duplicate_clique_leaders_tsv, sep="\t")
""")

# Pass 2: collect LIST() only for the confirmed duplicates (small join target).
# The two-pass approach also lets us restore biolink_type and clique_identifier_count,
# which were previously dropped due to memory pressure.
db.sql("""
SELECT d.clique_leader,
LIST(c.filename) AS filenames,
LIST(c.biolink_type) AS biolink_types,
LIST(c.clique_identifier_count) AS clique_identifier_counts,
d.clique_leader_count
FROM cliques c
JOIN dup_leaders d USING (clique_leader)
GROUP BY d.clique_leader, d.clique_leader_count
ORDER BY d.clique_leader_count DESC
""").write_csv(duplicate_clique_leaders_tsv, sep="\t")

cliques.close()

Expand Down
18 changes: 9 additions & 9 deletions src/snakefiles/duckdb.snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ rule export_all_to_duckdb:
# There are some reports we want to run on the Parquet files that have been generated.
rule check_for_identically_labeled_cliques:
resources:
mem="1500G",
mem="128G",
input:
config["output_directory"] + "/duckdb/done",
params:
Expand All @@ -111,16 +111,16 @@ rule check_for_identically_labeled_cliques:
output.duckdb_filename,
output.identically_labeled_cliques_tsv,
{
"memory_limit": "512G",
"threads": 2,
"memory_limit": "64G",
"threads": 4,
"preserve_insertion_order": False,
},
)


rule check_for_duplicate_curies:
resources:
mem="1500G",
mem="256G",
input:
config["output_directory"] + "/duckdb/done",
config["output_directory"] + "/duckdb/compendia_done",
Expand All @@ -137,16 +137,16 @@ rule check_for_duplicate_curies:
output.duckdb_filename,
output.duplicate_curies,
{
"memory_limit": "1500G",
"threads": 1,
"memory_limit": "128G",
"threads": 4,
"preserve_insertion_order": False,
},
)


rule check_for_duplicate_clique_leaders:
resources:
mem="1500G",
mem="128G",
input:
config["output_directory"] + "/duckdb/done",
config["output_directory"] + "/duckdb/compendia_done",
Expand All @@ -163,8 +163,8 @@ rule check_for_duplicate_clique_leaders:
output.duckdb_filename,
output.duplicate_clique_leaders_tsv,
{
"memory_limit": "512G",
"threads": 2,
"memory_limit": "64G",
"threads": 4,
"preserve_insertion_order": False,
},
)
Expand Down
Loading