Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions bin/combine_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
logger = get_logger(filename=Path(__file__).stem)

def read_and_preprocess(path: Path):
# We design input fastas from intermediate steps to be named like: "input_fasta___some_information_annotation_file.tsv"
input_fasta = input_fasta_from_filepath(path)
try:
df = pd.read_csv(path)
df[FASTA_COLUMN] = input_fasta # Add input_fasta column
df[FASTA_COLUMN] = input_fasta
return df
except Exception as e:
logger.error(f"Error loading DataFrame for input_fasta {input_fasta}: {str(e)}")
return pd.DataFrame() # Return an empty DataFrame in case of error
return pd.DataFrame()

def input_fasta_from_filepath(file_path: Path):
return file_path.stem.split("___")[0]
Expand Down Expand Up @@ -51,7 +50,6 @@ def count_motifs(gene_faa, motif="(C..CH)", genes_faa_dict=None):
for seq in read_sequence(gene_faa, format="fasta"):
if seq.metadata["id"] not in genes_faa_dict:
genes_faa_dict[seq.metadata["id"]] = {}

genes_faa_dict[seq.metadata["id"]]["heme_regulatory_motif_count"] = len(list(seq.find_with_regex(motif)))
return genes_faa_dict

Expand All @@ -61,12 +59,10 @@ def set_gene_data(gene_faa, genes_faa_dict=None):
for seq in read_sequence(gene_faa, format="fasta"):
if seq.metadata["id"] not in genes_faa_dict:
genes_faa_dict[seq.metadata["id"]] = {}

split_label = seq.metadata["id"].split("_")
gene_position = split_label[-1]
start_position, end_position, strandedness = seq.metadata["description"].split("#")[1:4]

genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN] = os.path.commonprefix([Path(gene_faa).stem, seq.metadata["id"]]).rstrip("_")
genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN] = str(Path(gene_faa).stem).replace('_called_genes', '')
genes_faa_dict[seq.metadata["id"]]["scaffold"] = (
seq.metadata["id"]
.removeprefix(genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN])
Expand All @@ -83,16 +79,13 @@ def organize_columns(df, special_columns=None):
special_columns = []
base_columns = ['query_id', FASTA_COLUMN, "scaffold", 'gene_number', 'start_position', 'stop_position', 'strandedness', 'rank']
base_columns = [col for col in base_columns if col in df.columns]

kegg_columns = sorted([col for col in df.columns if col.startswith('kegg_')], key=lambda x: (x != 'kegg_id', x))
other_columns = [col for col in df.columns if col not in base_columns + kegg_columns + special_columns]

db_prefixes = set(col.split('_')[0] for col in other_columns)
sorted_other_columns = []
for prefix in db_prefixes:
prefixed_columns = sorted([col for col in other_columns if col.startswith(prefix + '_')], key=lambda x: (x != f"{prefix}_id", x))
sorted_other_columns.extend(prefixed_columns)

final_columns_order = base_columns + kegg_columns + sorted_other_columns + special_columns
return df[final_columns_order]

Expand All @@ -106,25 +99,20 @@ def combine_annotations(annotations_dir, genes_dir, output, threads):
annotations = Path(annotations_dir).glob("*")
genes_faa = Path(genes_dir).glob("*")
with ThreadPoolExecutor(max_workers=threads) as executor:
# futures = [executor.submit(read_and_preprocess, input_fasta, path) for input_fasta, path in input_fastas_and_paths]
futures = [executor.submit(read_and_preprocess, Path(path)) for path in annotations]
data_frames = [future.result() for future in as_completed(futures)]

combined_data = pd.concat(data_frames, ignore_index=True)
combined_data = pd.concat([df for df in data_frames if not df.empty], ignore_index=True)
if genes_faa:
genes_faa_dict = dict()
for gene_path in genes_faa:
gene_path = str(gene_path)
genes_faa_dict
count_motifs(gene_path, "(C..CH)", genes_faa_dict=genes_faa_dict)
set_gene_data(gene_path, genes_faa_dict)
df = pd.DataFrame.from_dict(genes_faa_dict, orient='index')
combined_data = combined_data.drop(columns=df.columns, errors='ignore')
df.index.name = 'query_id'

# we use outer to get any genes that don't have hits
combined_data = pd.merge(combined_data, df, how="outer", on="query_id")
combined_data.loc[combined_data[FASTA_COLUMN].isna(), FASTA_COLUMN] = ""
df = pd.DataFrame.from_dict(genes_faa_dict, orient='index').reset_index().rename(columns={'index': 'query_id'})
combined_data = combined_data.drop(columns=df.columns.difference(["query_id", "scaffold", FASTA_COLUMN]), errors='ignore')
combined_data = pd.merge(combined_data, df, how="outer", on=["query_id", FASTA_COLUMN])

combined_data = convert_bit_scores_to_numeric(combined_data)

Expand Down
59 changes: 27 additions & 32 deletions bin/distill.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
DISTILATE_SORT_ORDER_COLUMNS = [COL_HEADER, COL_SUBHEADER, COL_MODULE, COL_GENE_ID]
EXCEL_MAX_CELL_SIZE = 32767

FASTA_COLUMN = os.getenv('FASTA_COLUMN')
DISTILL_DIR = Path(__file__).parent / "assets/forms/distill_sheets"


Expand Down Expand Up @@ -74,7 +73,7 @@ def fill_a_frame(frame: pd.DataFrame):

return pd.Series(counts, index=genome_summary_frame.index)

counts = annotations.groupby(groupby_column, sort=False).apply(fill_a_frame)
counts = annotations.groupby(groupby_column, sort=False)[annotations.columns].apply(fill_a_frame)
genome_summary_frame = pd.concat([genome_summary_frame, counts.T], axis=1)

return genome_summary_frame
Expand All @@ -99,7 +98,7 @@ def fill_genome_summary_frame_gene_names(annotations, genome_summary_frame, grou
return genome_summary_frame


def summarize_rrnas(rrnas_df, groupby_column=FASTA_COLUMN):
def summarize_rrnas(rrnas_df, groupby_column="input_fasta"):
genome_rrna_dict = dict()
for genome, frame in rrnas_df.groupby(groupby_column):
genome_rrna_dict[genome] = Counter(frame['type'])
Expand All @@ -113,7 +112,7 @@ def summarize_rrnas(rrnas_df, groupby_column=FASTA_COLUMN):
return rrna_frame


def make_genome_summary(annotations, genome_summary_frame, logger, groupby_column=FASTA_COLUMN):
def make_genome_summary(annotations, genome_summary_frame, logger, groupby_column="input_fasta"):

summary_frames = list()
# get ko summaries
Expand Down Expand Up @@ -154,16 +153,17 @@ def write_summarized_genomes_to_xlsx(summarized_genomes, output_file, extra_fram
frame = frame.sort_values(DISTILATE_SORT_ORDER_COLUMNS)
frame = frame.drop([COL_SHEET], axis=1)
gene_columns = list(set(frame.columns) - set(CONSTANT_DISTILLATE_COLUMNS))
split_genes = pd.concat([split_names_to_long(frame[i].astype(str)) for i in gene_columns], axis=1)
frame = pd.concat([frame[CONSTANT_DISTILLATE_COLUMNS], split_genes], axis=1)
if gene_columns:
split_genes = pd.concat([split_names_to_long(frame[i].astype(str)) for i in gene_columns], axis=1)
frame = pd.concat([frame[CONSTANT_DISTILLATE_COLUMNS], split_genes], axis=1)
frame.to_excel(writer, sheet_name=sheet, index=False)
for extra_frame in extra_frames:
if extra_frame is not None and not extra_frame.empty:
extra_frame.to_excel(writer, sheet_name=extra_frame[COL_SHEET].iloc[0], index=False)
extra_frame.to_excel(writer, sheet_name=extra_frame[COL_HEADER].iloc[0], index=False)


# TODO: add assembly stats like N50, longest contig, total assembled length etc
def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame=None, groupby_column=FASTA_COLUMN):
def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame=None, groupby_column="input_fasta"):
rows = list()
columns = ['genome']
if 'scaffold' in annotations.columns:
Expand Down Expand Up @@ -200,7 +200,6 @@ def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame
# Rename the index column to input_fasta (or whatever you want)
df_rrna = df_rrna.rename(columns={"index": "genome"})
df_rrna.columns.name = None
print(df_rrna)
genome_stats = pd.merge(genome_stats, df_rrna, how="outer", on="genome")
if trna_frame is not None:
meta_cols = ["gene_id", "gene_description", "category",
Expand Down Expand Up @@ -230,19 +229,19 @@ def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame
@click.command()
@click.option("-i", "--input_file", required=True, help="Annotations path")
# @click.option("-o", "--output_dir", required=True, help="Directory to write summarized genomes")
@click.option("--rrna_path", help="rRNA output from annotation")
@click.option("--trna_path", help="tRNA output from annotation")
@click.option("--quast_path", help="Quast summary TSV from the quast step")
@click.option("--rrna_path", help="rRNA output from annotation", default=None, type=click.Path(exists=True))
@click.option("--trna_path", help="tRNA output from annotation", default=None, type=click.Path(exists=True))
@click.option("--quast_path", help="Quast summary TSV from the quast step", default=None, type=click.Path(exists=True))
@click.option("--groupby_column", help="Column from annotations to group as organism units",
default=FASTA_COLUMN)
default="input_fasta", type = click.STRING)
@click.option("--distil_topics", default="default", help="Default distillates topics to run.")
@click.option("--distil_ecosystem", default="eng_sys,ag", help="Default distillates ecosystems to run.")
@click.option("--custom_distillate", default=[], callback=validate_comma_separated, help="Custom distillate forms to add your own modules, comma separated. ")
@click.option("--custom_distillate", default="", callback=validate_comma_separated, help="Custom distillate forms to add your own modules, comma separated. ")
@click.option("--distillate_gene_names", is_flag=True,
show_default=True, default=False,
help="Give names of genes instead of counts in genome metabolism summary")
def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby_column=FASTA_COLUMN, distil_topics=None, distil_ecosystem=None,
custom_distillate=None, distillate_gene_names=False):
def distill(input_file, rrna_path, trna_path, quast_path, groupby_column, distil_topics, distil_ecosystem,
custom_distillate, distillate_gene_names):
"""Summarize metabolic content of annotated genomes"""
# make output folder
# mkdir(output_dir)
Expand All @@ -255,24 +254,20 @@ def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby
# Check the columns are present
check_columns(annotations, logger)

if trna_path is None:
trna_frame = None
else:
trna_frame = None
rrna_frame = None
if all([v is not None for v in [trna_path, rrna_path]]):
trna_frame = pd.read_csv(trna_path, sep='\t')
if rrna_path is None:
rrna_frame = None
else:
rrna_frame = pd.read_csv(rrna_path, sep='\t')
# Check NF DRAM didn't pass an empty sheet to signal no tRNAs or rRNAs
if rrna_frame.empty:
rrna_frame = None
if trna_frame.empty:
trna_frame = None

if quast_path is None:
quast_frame = None
else:
if any(v.dropna(how="all").empty for v in [trna_frame, rrna_frame]):
trna_frame = None
rrna_frame = None

quast_frame = None
if quast_path is not None:
quast_frame = pd.read_csv(quast_path, sep='\t')
if quast_frame.dropna(how="all").empty:
quast_frame = None

distil_sheets_names = []
if "default" in distil_topics:
Expand Down Expand Up @@ -322,7 +317,7 @@ def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby
genome_summary_form = genome_summary_form.reset_index(drop=True)

# make genome stats
genome_stats = make_genome_stats(annotations, rrna_frame, trna_frame, quast_frame=quast_frame, groupby_column=groupby_column)
genome_stats = make_genome_stats(annotations, rrna_frame, trna_frame, quast_frame, groupby_column=groupby_column)
genome_stats.to_csv('genome_stats.tsv', sep='\t', index=None)
logger.info('Calculated genome statistics')

Expand Down
6 changes: 6 additions & 0 deletions conf/base.config
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ process {
withLabel:error_ignore {
errorStrategy = 'ignore'
}

withLabel:error_retry {
errorStrategy = 'retry'
maxRetries = 2
}

withName: 'DRAM:ANNOTATE:CALL:.*|DRAM:ANNOTATE:DB_SEARCH:.*|DRAM:ANNOTATE:QC:COLLECT_RNA:.*' {
array = params.array_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to support people running DRAM2 with local executor (such as on their own computer if they want), which doesn't support array. So the array should only be used with executors that support it.

}

}
1 change: 1 addition & 0 deletions conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ process {
]
}
withName: SUMMARIZE {
ext.args = { "--groupby_column ${params.CONSTANTS.FASTA_COLUMN}" }
publishDir = [
[
path: "${params.outdir}/SUMMARIZE",
Expand Down
41 changes: 25 additions & 16 deletions modules/local/distill/distill.nf
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,35 @@ process SUMMARIZE {
container "community.wave.seqera.io/library/python_pandas_openpyxl_click_dram-viz:bd6f4fb065d73a68"

input:
path( ch_combined_annotations, stageAs: "raw-annotations.tsv" )
path( ch_rrna_collected, stageAs: "rrna_combined.tsv" )
path( ch_trna_collected, stageAs: "trna_combined.tsv" )
path( ch_quast_stats )
val( distill_topic )
val( distill_ecosystem )
val( distill_custom )
path combined_annotations
path rrna_collected
path trna_collected
path quast_stats
val distill_topic
val distill_ecosystem
val distill_custom

output:
path( "metabolism_summary.xlsx" ), emit: distillate
path( "*.log" ), emit: log
path( "summarized_genomes.tsv" ), emit: summarized_genomes
path( "genome_stats.tsv" ), emit: genome_stats
path "metabolism_summary.xlsx", emit: distillate
path "*.log", emit: log
path "summarized_genomes.tsv", emit: summarized_genomes
path "genome_stats.tsv", emit: genome_stats

script:
"""
# export constants for script
export FASTA_COLUMN="${params.CONSTANTS.FASTA_COLUMN}"

distill.py -i ${ch_combined_annotations} --rrna_path '${ch_rrna_collected}' --trna_path '${ch_trna_collected}' --distil_topics "${distill_topic}" --distil_ecosystem "${distill_ecosystem}" --custom_distillate "${distill_custom}" --quast_path '${ch_quast_stats}'
def args = task.ext.args ?: ""
def rrna = rrna_collected ? "--rrna_path ${rrna_collected}" : ""
def trna = trna_collected ? "--trna_path ${trna_collected}" : ""
def quast = quast_stats ? "--quast_path ${quast_stats}" : ""

"""
distill.py \
-i ${combined_annotations} \
${rrna} \
${trna} \
${quast} \
--distil_topics "${distill_topic}" \
--distil_ecosystem "${distill_ecosystem}" \
--custom_distillate "${distill_custom}" \
${args}
"""
}
3 changes: 2 additions & 1 deletion nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ params {


/* Process Options */
queue_size = 10
array_size = 10
// queue_size = 10
// This is the resource requirements for a single process
// Not the limit to the total resources available to the pipeline
// Up to queue_size processes can run in parallel, of various sizes
Expand Down
9 changes: 3 additions & 6 deletions subworkflows/local/annotate.nf
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ workflow ANNOTATE {
ch_combined_annotations = default_sheet

if (params.rename || call) {
fasta_name = ch_fasta.map { it[0] }
fasta_files = ch_fasta.map { it[1] }
fasta_name = ch_fasta.map { it-> it[0] }
fasta_files = ch_fasta.map { it -> it[1] }

n_fastas = file("$params.input_fasta/${params.fasta_fmt}").size()
}
Expand All @@ -49,10 +49,7 @@ workflow ANNOTATE {
// we use flatten here to turn a list back into a channel
renamed_fasta_paths = RENAME_FASTA.out.renamed_fasta_paths.flatten()
// we need to recreate the fasta channel with the renamed fasta files
ch_fasta = renamed_fasta_paths.map {
fasta_name = it.getBaseName()
tuple(fasta_name, it)
}
ch_fasta = renamed_fasta_paths.map { it -> [ it.getBaseName(), it ] }
}

ch_quast_stats = default_sheet
Expand Down
4 changes: 2 additions & 2 deletions subworkflows/local/call.nf
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ workflow CALL {
.set { ch_collected_faa } // Set the resulting list to ch_collected_faa

// Collect all individual fasta to pass to quast
Channel.empty()
channel.empty()
.mix( ch_called_genes )
.collect()
.set { ch_collected_fna }

// Collect all individual fasta to pass to quast
Channel.empty()
channel.empty()
.mix( ch_filtered_fasta, ch_gene_gff )
.collect()
.set { ch_collected_fasta }
Expand Down
10 changes: 5 additions & 5 deletions subworkflows/local/collect_rna.nf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ workflow COLLECT_RNA {
// If we didn't run call
if (!call) {
if (params.rrnas) {
Channel.fromPath("${params.rrnas}/*.tsv", checkIfExists: true)
channel.fromPath("${params.rrnas}/*.tsv", checkIfExists: true)
.ifEmpty { exit 1, "If you specify --distill_<topic|ecosystem|custom> without --call, you must provide individual rRNA files generated with barrnap. Cannot find any files at: ${params.rrnas}\nNB: Path needs to follow pattern: path/to/directory" }
.collect()
.set { ch_collected_rRNAs }
Expand All @@ -42,7 +42,7 @@ workflow COLLECT_RNA {
}
if (params.trnas) {
// the user provided rrnas or trnas
Channel.fromPath("${params.trnas}/*.tsv", checkIfExists: true)
channel.fromPath("${params.trnas}/*.tsv", checkIfExists: true)
.ifEmpty { exit 1, "If you specify --distill_<topic|ecosystem|custom> without --call, you must provide individual tRNA files generated with tRNAscan-SE. Cannot find any files at: ${params.trnas}\nNB: Path needs to follow pattern: path/to/directory" }
.collect()
.set { ch_collected_tRNAs }
Expand All @@ -53,14 +53,14 @@ workflow COLLECT_RNA {
TRNA_SCAN( ch_fasta )
ch_trna_scan = TRNA_SCAN.out.trna_scan_out
// Collect all input_fasta formatted tRNA files
Channel.empty()
channel.empty()
.mix( ch_trna_scan )
.collect()
.set { ch_collected_tRNAs }
// Run barrnap on each fasta to identify rRNAs
RRNA_SCAN( ch_fasta )
ch_rrna_scan = RRNA_SCAN.out.rrna_scan_out
Channel.empty()
channel.empty()
.mix( ch_rrna_scan )
.collect()
.set { ch_collected_rRNAs }
Expand Down Expand Up @@ -96,4 +96,4 @@ workflow COLLECT_RNA {
ch_trna_collected
ch_trna_combined

}
}
Loading