Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import gzip
import http.server
import math
import platform
import re
import shutil
Expand Down Expand Up @@ -252,6 +253,89 @@ def write_numeric_qc_inputs(out_dir):
return out_vcf, out_fai


def write_qc_vcf_without_gatk_annotations(out_dir):
"""Write a small QC VCF whose header lacks GATK-specific INFO annotations."""
out_dir = Path(out_dir)
out_vcf = out_dir / "missing_gatk_annotations_raw.vcf.gz"
sample_names = [
line.strip()
for line in (TEST_DATA_DIR / "qc" / "samples.csv").read_text().splitlines()[1:]
if line.strip()
]
with open(TEST_DATA_DIR / "qc" / "ref.fai") as handle:
contig = handle.readline().split()[0]

header = [
"##fileformat=VCFv4.2",
'##FILTER=<ID=PASS,Description="All filters passed">',
f"##contig=<ID={contig}>",
'##INFO=<ID=AF,Number=A,Type=Float,Description="Allele frequency">',
'##INFO=<ID=MQ,Number=1,Type=Float,Description="RMS mapping quality">',
'##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">',
'##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read depth">',
"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t" + "\t".join(sample_names),
]
genotypes = [
f"{'0/1' if sample_idx < 4 else '0/0'}:10"
for sample_idx, _sample in enumerate(sample_names)
]
record = [
contig,
"1",
".",
"A",
"G",
"60",
"PASS",
"AF=0.02;MQ=50",
"GT:DP",
*genotypes,
]

with gzip.open(out_vcf, "wt") as handle:
handle.write("\n".join(header))
handle.write("\n")
handle.write("\t".join(record))
handle.write("\n")

return out_vcf, sample_names


def write_ad_only_qc_vcf(out_dir):
"""Write a QC VCF with FORMAT/AD but no per-genotype FORMAT/DP."""
source_vcf = TEST_DATA_DIR / "qc" / "raw.vcf.gz"
out_vcf = Path(out_dir) / "ad_only_raw.vcf.gz"

with gzip.open(source_vcf, "rt") as src, gzip.open(out_vcf, "wt") as dst:
for line in src:
if line.startswith("##FORMAT=<ID=DP,"):
continue
if line.startswith("#"):
dst.write(line)
continue

fields = line.rstrip("\n").split("\t")
format_fields = fields[8].split(":")
if "DP" in format_fields:
dp_index = format_fields.index("DP")
keep_indices = [
index
for index in range(len(format_fields))
if index != dp_index
]
fields[8] = ":".join(format_fields[index] for index in keep_indices)
for sample_index in range(9, len(fields)):
genotype_fields = fields[sample_index].split(":")
fields[sample_index] = ":".join(
genotype_fields[index]
for index in keep_indices
if index < len(genotype_fields)
)
dst.write("\t".join(fields) + "\n")

return out_vcf


def get_vcf_contig_headers(path):
"""Return contig IDs declared in the VCF header."""
opener = gzip.open if str(path).endswith(".gz") else open
Expand Down Expand Up @@ -1330,6 +1414,7 @@ def test_qc_dry_run(request):
# copy_qc_report should appear since main workflow provides qc_report
assert "qc_copy_qc_report" in output, \
"Expected qc_copy_qc_report rule in DAG"
assert "bcftools query --allow-undef-tags" in output


@pytest.mark.dry_run
Expand Down Expand Up @@ -1495,6 +1580,45 @@ def test_qc_dashboard_helper_preserves_numeric_like_ids():
assert result.returncode == 0, (result.stdout + result.stderr).strip()


def test_qc_dashboard_depth_helpers_handle_nonfinite_values():
"""QC dashboard helpers should sanitize non-finite depth and avoid empty lm fits."""
if shutil.which("Rscript") is None:
pytest.skip("Rscript is not available")

rmd_path = WORKFLOW_DIR / "modules" / "qc" / "scripts" / "qc_dashboard_interactive.Rmd"
helper_source = "\n".join(
[
extract_r_function_source(rmd_path, "sanitize_depth_values"),
extract_r_function_source(rmd_path, "depth_pc_r2"),
]
)

with tempfile.TemporaryDirectory() as tmpdir:
script = Path(tmpdir) / "validate_depth_helpers.R"
script.write_text(
"\n".join(
[
helper_source,
"depth <- sanitize_depth_values(c('5.5', 'NaN', '-nan', 'Inf', '-Inf', 'NA', '.'))",
"if (!isTRUE(all.equal(depth[[1]], 5.5))) stop('Finite depth was not preserved')",
"if (!all(is.na(depth[2:7]))) stop('Non-finite depth values were not converted to NA')",
"empty_depth <- data.frame(MEAN_DEPTH = c(NA_real_, NaN, Inf), value = c(1, 2, 3))",
"if (!is.na(depth_pc_r2(empty_depth))) stop('Expected NA R2 for zero finite depth rows')",
"mixed_depth <- data.frame(MEAN_DEPTH = c(1, 2, NA, 4), value = c(1, 2, 3, 4))",
"if (!is.finite(depth_pc_r2(mixed_depth))) stop('Expected finite R2 for complete finite rows')",
]
)
)

result = subprocess.run(
["Rscript", str(script)],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, (result.stdout + result.stderr).strip()


@pytest.mark.dry_run
def test_qc_plink_filters_sparse_samples_before_pca(request):
"""QC PLINK step should pass a per-sample missingness filter to avoid NaN GRMs."""
Expand Down Expand Up @@ -1522,6 +1646,134 @@ def test_qc_plink_filters_sparse_samples_before_pca(request):
assert "--mind 0.49" in output


@pytest.mark.full_run
def test_qc_subsample_snps_allows_missing_gatk_annotations(request):
"""SNP QC export should emit dots when caller-specific annotations are absent."""
no_conda = request.config.getoption("--no-conda")
with tempfile.TemporaryDirectory() as tmpdir:
missing_annotations_vcf, sample_names = write_qc_vcf_without_gatk_annotations(tmpdir)
smk = SnakemakeRunner(
Path(tmpdir),
use_conda=not no_conda,
snakefile=WORKFLOW_DIR / "modules" / "qc" / "Snakefile",
)
sample_filter = Path(tmpdir) / "results" / "qc" / "individuals.samps.txt"
sample_filter.parent.mkdir(parents=True, exist_ok=True)
sample_filter.write_text("\n".join(sample_names) + "\n")
result = smk.run(
target="results/qc/snpqc.txt",
configfile=WORKFLOW_DIR / "modules" / "qc" / "config" / "config.yaml",
config_overrides={
"samples": str(TEST_DATA_DIR / "qc" / "samples.csv"),
"vcf": str(missing_annotations_vcf),
"fai": str(TEST_DATA_DIR / "qc" / "ref.fai"),
},
)
skip_if_arm64_packages_unavailable(result, "bcftools")
result.assert_success()
result.assert_output_exists("results/qc/snpqc.txt")

snpqc = Path(tmpdir) / "results" / "qc" / "snpqc.txt"
rows = [
line.rstrip("\n").split("\t")
for line in snpqc.read_text().splitlines()
if line.strip()
]
assert rows, "Expected SNP QC rows"
assert all(len(row) == 10 for row in rows)

with open(TEST_DATA_DIR / "qc" / "ref.fai") as handle:
contig = handle.readline().split()[0]
first = rows[0]
assert first[0] == contig
assert first[1] == "1"
assert first[3] == "0.02"
assert first[4] == "60"
assert first[5:8] == [".", ".", "."]
assert first[8] == "50"
assert first[9] == "."


@pytest.mark.full_run
def test_qc_ad_only_vcf_computes_finite_depth(request):
"""AD-only VCFs should produce finite SNP depth from summed FORMAT/AD values."""
no_conda = request.config.getoption("--no-conda")
with tempfile.TemporaryDirectory() as tmpdir:
ad_only_vcf = write_ad_only_qc_vcf(tmpdir)
config_overrides = {
"samples": str(TEST_DATA_DIR / "qc" / "samples.csv"),
"sample_metadata": str(TEST_DATA_DIR / "qc" / "sample_metadata.csv"),
"vcf": str(ad_only_vcf),
"fai": str(TEST_DATA_DIR / "qc" / "ref.fai"),
"qc_report": str(TEST_DATA_DIR / "qc" / "qc_report.tsv"),
}
smk = SnakemakeRunner(
Path(tmpdir),
use_conda=not no_conda,
snakefile=WORKFLOW_DIR / "modules" / "qc" / "Snakefile",
)
depth_result = smk.run(
target="results/qc/individuals.idepth",
configfile=WORKFLOW_DIR / "modules" / "qc" / "config" / "config.yaml",
config_overrides=config_overrides,
)
skip_if_arm64_packages_unavailable(depth_result, "bcftools", "vcftools")
depth_result.assert_success()
depth_result.assert_output_exists("results/qc/individuals.idepth")

depth_path = Path(tmpdir) / "results" / "qc" / "individuals.idepth"
lines = [
line.rstrip("\n").split("\t")
for line in depth_path.read_text().splitlines()
if line.strip()
]
header = lines[0]
nsites_index = header.index("N_SITES")
mean_depth_index = header.index("MEAN_DEPTH")
rows = lines[1:]

assert rows, "Expected AD-derived depth rows"
assert all(int(row[nsites_index]) > 0 for row in rows)
assert all(math.isfinite(float(row[mean_depth_index])) for row in rows)


@pytest.mark.full_run
def test_qc_ad_only_vcf_renders_dashboard(request):
"""AD-only VCFs should still render the complete QC dashboard."""
no_conda = request.config.getoption("--no-conda")
with tempfile.TemporaryDirectory() as tmpdir:
ad_only_vcf = write_ad_only_qc_vcf(tmpdir)
config_overrides = {
"samples": str(TEST_DATA_DIR / "qc" / "samples.csv"),
"sample_metadata": str(TEST_DATA_DIR / "qc" / "sample_metadata.csv"),
"vcf": str(ad_only_vcf),
"fai": str(TEST_DATA_DIR / "qc" / "ref.fai"),
"qc_report": str(TEST_DATA_DIR / "qc" / "qc_report.tsv"),
}
smk = SnakemakeRunner(
Path(tmpdir),
use_conda=not no_conda,
snakefile=WORKFLOW_DIR / "modules" / "qc" / "Snakefile",
)
dashboard_result = smk.run(
target="all",
configfile=WORKFLOW_DIR / "modules" / "qc" / "config" / "config.yaml",
config_overrides=config_overrides,
)
skip_if_arm64_packages_unavailable(
dashboard_result,
"admixture",
"bcftools",
"plink2",
"plink",
"r-ggmap",
"r-ape",
"bioconductor-ggtree",
)
dashboard_result.assert_success()
dashboard_result.assert_output_exists("results/qc/qc_dashboard.html")


@pytest.mark.full_run
def test_qc_standalone_full_run(request):
"""Full execution of QC module as standalone workflow against test fixtures."""
Expand Down
Loading
Loading