diff --git a/bin/combine_annotations.py b/bin/combine_annotations.py index 3387f868..9029e8ca 100755 --- a/bin/combine_annotations.py +++ b/bin/combine_annotations.py @@ -14,15 +14,14 @@ logger = get_logger(filename=Path(__file__).stem) def read_and_preprocess(path: Path): - # We design input fastas from intermediate steps to be named like: "input_fasta___some_information_annotation_file.tsv" input_fasta = input_fasta_from_filepath(path) try: df = pd.read_csv(path) - df[FASTA_COLUMN] = input_fasta # Add input_fasta column + df[FASTA_COLUMN] = input_fasta return df except Exception as e: logger.error(f"Error loading DataFrame for input_fasta {input_fasta}: {str(e)}") - return pd.DataFrame() # Return an empty DataFrame in case of error + return pd.DataFrame() def input_fasta_from_filepath(file_path: Path): return file_path.stem.split("___")[0] @@ -51,7 +50,6 @@ def count_motifs(gene_faa, motif="(C..CH)", genes_faa_dict=None): for seq in read_sequence(gene_faa, format="fasta"): if seq.metadata["id"] not in genes_faa_dict: genes_faa_dict[seq.metadata["id"]] = {} - genes_faa_dict[seq.metadata["id"]]["heme_regulatory_motif_count"] = len(list(seq.find_with_regex(motif))) return genes_faa_dict @@ -61,12 +59,10 @@ def set_gene_data(gene_faa, genes_faa_dict=None): for seq in read_sequence(gene_faa, format="fasta"): if seq.metadata["id"] not in genes_faa_dict: genes_faa_dict[seq.metadata["id"]] = {} - split_label = seq.metadata["id"].split("_") gene_position = split_label[-1] start_position, end_position, strandedness = seq.metadata["description"].split("#")[1:4] - - genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN] = os.path.commonprefix([Path(gene_faa).stem, seq.metadata["id"]]).rstrip("_") + genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN] = str(Path(gene_faa).stem).replace('_called_genes', '') genes_faa_dict[seq.metadata["id"]]["scaffold"] = ( seq.metadata["id"] .removeprefix(genes_faa_dict[seq.metadata["id"]][FASTA_COLUMN]) @@ -83,16 +79,13 @@ def organize_columns(df, special_columns=None): special_columns = [] base_columns = ['query_id', FASTA_COLUMN, "scaffold", 'gene_number', 'start_position', 'stop_position', 'strandedness', 'rank'] base_columns = [col for col in base_columns if col in df.columns] - kegg_columns = sorted([col for col in df.columns if col.startswith('kegg_')], key=lambda x: (x != 'kegg_id', x)) other_columns = [col for col in df.columns if col not in base_columns + kegg_columns + special_columns] - db_prefixes = set(col.split('_')[0] for col in other_columns) sorted_other_columns = [] for prefix in db_prefixes: prefixed_columns = sorted([col for col in other_columns if col.startswith(prefix + '_')], key=lambda x: (x != f"{prefix}_id", x)) sorted_other_columns.extend(prefixed_columns) - final_columns_order = base_columns + kegg_columns + sorted_other_columns + special_columns return df[final_columns_order] @@ -106,11 +99,10 @@ def combine_annotations(annotations_dir, genes_dir, output, threads): annotations = Path(annotations_dir).glob("*") genes_faa = Path(genes_dir).glob("*") with ThreadPoolExecutor(max_workers=threads) as executor: - # futures = [executor.submit(read_and_preprocess, input_fasta, path) for input_fasta, path in input_fastas_and_paths] futures = [executor.submit(read_and_preprocess, Path(path)) for path in annotations] data_frames = [future.result() for future in as_completed(futures)] - combined_data = pd.concat(data_frames, ignore_index=True) + combined_data = pd.concat([df for df in data_frames if not df.empty], ignore_index=True) if genes_faa: genes_faa_dict = dict() for gene_path in genes_faa: @@ -118,13 +110,9 @@ def combine_annotations(annotations_dir, genes_dir, output, threads): genes_faa_dict count_motifs(gene_path, "(C..CH)", genes_faa_dict=genes_faa_dict) set_gene_data(gene_path, genes_faa_dict) - df = pd.DataFrame.from_dict(genes_faa_dict, orient='index') - combined_data = combined_data.drop(columns=df.columns, errors='ignore') - df.index.name = 'query_id' - - # we use outer to get any genes that don't have hits - combined_data = pd.merge(combined_data, df, how="outer", on="query_id") - combined_data.loc[combined_data[FASTA_COLUMN].isna(), FASTA_COLUMN] = "" + df = pd.DataFrame.from_dict(genes_faa_dict, orient='index').reset_index().rename(columns={'index': 'query_id'}) + combined_data = combined_data.drop(columns=df.columns.difference(["query_id", "scaffold", FASTA_COLUMN]), errors='ignore') + combined_data = pd.merge(combined_data, df, how="outer", on=["query_id", FASTA_COLUMN]) combined_data = convert_bit_scores_to_numeric(combined_data) diff --git a/bin/distill.py b/bin/distill.py index a6068826..232a8192 100755 --- a/bin/distill.py +++ b/bin/distill.py @@ -27,7 +27,6 @@ DISTILATE_SORT_ORDER_COLUMNS = [COL_HEADER, COL_SUBHEADER, COL_MODULE, COL_GENE_ID] EXCEL_MAX_CELL_SIZE = 32767 -FASTA_COLUMN = os.getenv('FASTA_COLUMN') DISTILL_DIR = Path(__file__).parent / "assets/forms/distill_sheets" @@ -74,7 +73,7 @@ def fill_a_frame(frame: pd.DataFrame): return pd.Series(counts, index=genome_summary_frame.index) - counts = annotations.groupby(groupby_column, sort=False).apply(fill_a_frame) + counts = annotations.groupby(groupby_column, sort=False)[annotations.columns].apply(fill_a_frame) genome_summary_frame = pd.concat([genome_summary_frame, counts.T], axis=1) return genome_summary_frame @@ -99,7 +98,7 @@ def fill_genome_summary_frame_gene_names(annotations, genome_summary_frame, grou return genome_summary_frame -def summarize_rrnas(rrnas_df, groupby_column=FASTA_COLUMN): +def summarize_rrnas(rrnas_df, groupby_column="input_fasta"): genome_rrna_dict = dict() for genome, frame in rrnas_df.groupby(groupby_column): genome_rrna_dict[genome] = Counter(frame['type']) @@ -113,7 +112,7 @@ def summarize_rrnas(rrnas_df, groupby_column=FASTA_COLUMN): return rrna_frame -def make_genome_summary(annotations, genome_summary_frame, logger, groupby_column=FASTA_COLUMN): +def make_genome_summary(annotations, genome_summary_frame, logger, groupby_column="input_fasta"): summary_frames = list() # get ko summaries @@ -154,16 +153,17 @@ def write_summarized_genomes_to_xlsx(summarized_genomes, output_file, extra_fram frame = frame.sort_values(DISTILATE_SORT_ORDER_COLUMNS) frame = frame.drop([COL_SHEET], axis=1) gene_columns = list(set(frame.columns) - set(CONSTANT_DISTILLATE_COLUMNS)) - split_genes = pd.concat([split_names_to_long(frame[i].astype(str)) for i in gene_columns], axis=1) - frame = pd.concat([frame[CONSTANT_DISTILLATE_COLUMNS], split_genes], axis=1) + if gene_columns: + split_genes = pd.concat([split_names_to_long(frame[i].astype(str)) for i in gene_columns], axis=1) + frame = pd.concat([frame[CONSTANT_DISTILLATE_COLUMNS], split_genes], axis=1) frame.to_excel(writer, sheet_name=sheet, index=False) for extra_frame in extra_frames: if extra_frame is not None and not extra_frame.empty: - extra_frame.to_excel(writer, sheet_name=extra_frame[COL_SHEET].iloc[0], index=False) + extra_frame.to_excel(writer, sheet_name=extra_frame[COL_HEADER].iloc[0], index=False) # TODO: add assembly stats like N50, longest contig, total assembled length etc -def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame=None, groupby_column=FASTA_COLUMN): +def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame=None, groupby_column="input_fasta"): rows = list() columns = ['genome'] if 'scaffold' in annotations.columns: @@ -200,7 +200,6 @@ def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame # Rename the index column to input_fasta (or whatever you want) df_rrna = df_rrna.rename(columns={"index": "genome"}) df_rrna.columns.name = None - print(df_rrna) genome_stats = pd.merge(genome_stats, df_rrna, how="outer", on="genome") if trna_frame is not None: meta_cols = ["gene_id", "gene_description", "category", @@ -230,19 +229,19 @@ def make_genome_stats(annotations, rrna_frame=None, trna_frame=None, quast_frame @click.command() @click.option("-i", "--input_file", required=True, help="Annotations path") # @click.option("-o", "--output_dir", required=True, help="Directory to write summarized genomes") -@click.option("--rrna_path", help="rRNA output from annotation") -@click.option("--trna_path", help="tRNA output from annotation") -@click.option("--quast_path", help="Quast summary TSV from the quast step") +@click.option("--rrna_path", help="rRNA output from annotation", default=None, type=click.Path(exists=True)) +@click.option("--trna_path", help="tRNA output from annotation", default=None, type=click.Path(exists=True)) +@click.option("--quast_path", help="Quast summary TSV from the quast step", default=None, type=click.Path(exists=True)) @click.option("--groupby_column", help="Column from annotations to group as organism units", - default=FASTA_COLUMN) + default="input_fasta", type = click.STRING) @click.option("--distil_topics", default="default", help="Default distillates topics to run.") @click.option("--distil_ecosystem", default="eng_sys,ag", help="Default distillates ecosystems to run.") -@click.option("--custom_distillate", default=[], callback=validate_comma_separated, help="Custom distillate forms to add your own modules, comma separated. ") +@click.option("--custom_distillate", default="", callback=validate_comma_separated, help="Custom distillate forms to add your own modules, comma separated. ") @click.option("--distillate_gene_names", is_flag=True, show_default=True, default=False, help="Give names of genes instead of counts in genome metabolism summary") -def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby_column=FASTA_COLUMN, distil_topics=None, distil_ecosystem=None, - custom_distillate=None, distillate_gene_names=False): +def distill(input_file, rrna_path, trna_path, quast_path, groupby_column, distil_topics, distil_ecosystem, + custom_distillate, distillate_gene_names): """Summarize metabolic content of annotated genomes""" # make output folder # mkdir(output_dir) @@ -255,24 +254,20 @@ def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby # Check the columns are present check_columns(annotations, logger) - if trna_path is None: - trna_frame = None - else: + trna_frame = None + rrna_frame = None + if all([v is not None for v in [trna_path, rrna_path]]): trna_frame = pd.read_csv(trna_path, sep='\t') - if rrna_path is None: - rrna_frame = None - else: rrna_frame = pd.read_csv(rrna_path, sep='\t') - # Check NF DRAM didn't pass an empty sheet to signal no tRNAs or rRNAs - if rrna_frame.empty: - rrna_frame = None - if trna_frame.empty: - trna_frame = None - - if quast_path is None: - quast_frame = None - else: + if any(v.dropna(how="all").empty for v in [trna_frame, rrna_frame]): + trna_frame = None + rrna_frame = None + + quast_frame = None + if quast_path is not None: quast_frame = pd.read_csv(quast_path, sep='\t') + if quast_frame.dropna(how="all").empty: + quast_frame = None distil_sheets_names = [] if "default" in distil_topics: @@ -322,7 +317,7 @@ def distill(input_file, rrna_path=None, trna_path=None, quast_path=None, groupby genome_summary_form = genome_summary_form.reset_index(drop=True) # make genome stats - genome_stats = make_genome_stats(annotations, rrna_frame, trna_frame, quast_frame=quast_frame, groupby_column=groupby_column) + genome_stats = make_genome_stats(annotations, rrna_frame, trna_frame, quast_frame, groupby_column=groupby_column) genome_stats.to_csv('genome_stats.tsv', sep='\t', index=None) logger.info('Calculated genome statistics') diff --git a/conf/base.config b/conf/base.config index 1e69ac57..460af8ca 100644 --- a/conf/base.config +++ b/conf/base.config @@ -59,8 +59,14 @@ process { withLabel:error_ignore { errorStrategy = 'ignore' } + withLabel:error_retry { errorStrategy = 'retry' maxRetries = 2 } + + withName: 'DRAM:ANNOTATE:CALL:.*|DRAM:ANNOTATE:DB_SEARCH:.*|DRAM:ANNOTATE:QC:COLLECT_RNA:.*' { + array = params.array_size + } + } diff --git a/conf/modules.config b/conf/modules.config index 3cdcf759..342b35f5 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -254,6 +254,7 @@ process { ] } withName: SUMMARIZE { + ext.args = { "--groupby_column ${params.CONSTANTS.FASTA_COLUMN}" } publishDir = [ [ path: "${params.outdir}/SUMMARIZE", diff --git a/modules/local/distill/distill.nf b/modules/local/distill/distill.nf index 3c772684..94a5b94e 100644 --- a/modules/local/distill/distill.nf +++ b/modules/local/distill/distill.nf @@ -7,26 +7,35 @@ process SUMMARIZE { container "community.wave.seqera.io/library/python_pandas_openpyxl_click_dram-viz:bd6f4fb065d73a68" input: - path( ch_combined_annotations, stageAs: "raw-annotations.tsv" ) - path( ch_rrna_collected, stageAs: "rrna_combined.tsv" ) - path( ch_trna_collected, stageAs: "trna_combined.tsv" ) - path( ch_quast_stats ) - val( distill_topic ) - val( distill_ecosystem ) - val( distill_custom ) + path combined_annotations + path rrna_collected + path trna_collected + path quast_stats + val distill_topic + val distill_ecosystem + val distill_custom output: - path( "metabolism_summary.xlsx" ), emit: distillate - path( "*.log" ), emit: log - path( "summarized_genomes.tsv" ), emit: summarized_genomes - path( "genome_stats.tsv" ), emit: genome_stats + path "metabolism_summary.xlsx", emit: distillate + path "*.log", emit: log + path "summarized_genomes.tsv", emit: summarized_genomes + path "genome_stats.tsv", emit: genome_stats script: - """ - # export constants for script - export FASTA_COLUMN="${params.CONSTANTS.FASTA_COLUMN}" - - distill.py -i ${ch_combined_annotations} --rrna_path '${ch_rrna_collected}' --trna_path '${ch_trna_collected}' --distil_topics "${distill_topic}" --distil_ecosystem "${distill_ecosystem}" --custom_distillate "${distill_custom}" --quast_path '${ch_quast_stats}' + def args = task.ext.args ?: "" + def rrna = rrna_collected ? "--rrna_path ${rrna_collected}" : "" + def trna = trna_collected ? "--trna_path ${trna_collected}" : "" + def quast = quast_stats ? "--quast_path ${quast_stats}" : "" """ + distill.py \ + -i ${combined_annotations} \ + ${rrna} \ + ${trna} \ + ${quast} \ + --distil_topics "${distill_topic}" \ + --distil_ecosystem "${distill_ecosystem}" \ + --custom_distillate "${distill_custom}" \ + ${args} + """ } diff --git a/nextflow.config b/nextflow.config index 39502368..d37f7708 100644 --- a/nextflow.config +++ b/nextflow.config @@ -163,7 +163,8 @@ params { /* Process Options */ - queue_size = 10 + array_size = 10 + // queue_size = 10 // This is the resource requirements for a single process // Not the limit to the total resources available to the pipeline // Up to queue_size processes can run in parallel, of various sizes diff --git a/subworkflows/local/annotate.nf b/subworkflows/local/annotate.nf index bd30a51b..b231ac73 100644 --- a/subworkflows/local/annotate.nf +++ b/subworkflows/local/annotate.nf @@ -34,8 +34,8 @@ workflow ANNOTATE { ch_combined_annotations = default_sheet if (params.rename || call) { - fasta_name = ch_fasta.map { it[0] } - fasta_files = ch_fasta.map { it[1] } + fasta_name = ch_fasta.map { it-> it[0] } + fasta_files = ch_fasta.map { it -> it[1] } n_fastas = file("$params.input_fasta/${params.fasta_fmt}").size() } @@ -49,10 +49,7 @@ workflow ANNOTATE { // we use flatten here to turn a list back into a channel renamed_fasta_paths = RENAME_FASTA.out.renamed_fasta_paths.flatten() // we need to recreate the fasta channel with the renamed fasta files - ch_fasta = renamed_fasta_paths.map { - fasta_name = it.getBaseName() - tuple(fasta_name, it) - } + ch_fasta = renamed_fasta_paths.map { it -> [ it.getBaseName(), it ] } } ch_quast_stats = default_sheet diff --git a/subworkflows/local/call.nf b/subworkflows/local/call.nf index 74ae33d5..5f7c5dc6 100644 --- a/subworkflows/local/call.nf +++ b/subworkflows/local/call.nf @@ -38,13 +38,13 @@ workflow CALL { .set { ch_collected_faa } // Set the resulting list to ch_collected_faa // Collect all individual fasta to pass to quast - Channel.empty() + channel.empty() .mix( ch_called_genes ) .collect() .set { ch_collected_fna } // Collect all individual fasta to pass to quast - Channel.empty() + channel.empty() .mix( ch_filtered_fasta, ch_gene_gff ) .collect() .set { ch_collected_fasta } diff --git a/subworkflows/local/collect_rna.nf b/subworkflows/local/collect_rna.nf index c9762dff..2c566b07 100644 --- a/subworkflows/local/collect_rna.nf +++ b/subworkflows/local/collect_rna.nf @@ -33,7 +33,7 @@ workflow COLLECT_RNA { // If we didn't run call if (!call) { if (params.rrnas) { - Channel.fromPath("${params.rrnas}/*.tsv", checkIfExists: true) + channel.fromPath("${params.rrnas}/*.tsv", checkIfExists: true) .ifEmpty { exit 1, "If you specify --distill_ without --call, you must provide individual rRNA files generated with barrnap. Cannot find any files at: ${params.rrnas}\nNB: Path needs to follow pattern: path/to/directory" } .collect() .set { ch_collected_rRNAs } @@ -42,7 +42,7 @@ workflow COLLECT_RNA { } if (params.trnas) { // the user provided rrnas or trnas - Channel.fromPath("${params.trnas}/*.tsv", checkIfExists: true) + channel.fromPath("${params.trnas}/*.tsv", checkIfExists: true) .ifEmpty { exit 1, "If you specify --distill_ without --call, you must provide individual tRNA files generated with tRNAscan-SE. Cannot find any files at: ${params.trnas}\nNB: Path needs to follow pattern: path/to/directory" } .collect() .set { ch_collected_tRNAs } @@ -53,14 +53,14 @@ workflow COLLECT_RNA { TRNA_SCAN( ch_fasta ) ch_trna_scan = TRNA_SCAN.out.trna_scan_out // Collect all input_fasta formatted tRNA files - Channel.empty() + channel.empty() .mix( ch_trna_scan ) .collect() .set { ch_collected_tRNAs } // Run barrnap on each fasta to identify rRNAs RRNA_SCAN( ch_fasta ) ch_rrna_scan = RRNA_SCAN.out.rrna_scan_out - Channel.empty() + channel.empty() .mix( ch_rrna_scan ) .collect() .set { ch_collected_rRNAs } @@ -96,4 +96,4 @@ workflow COLLECT_RNA { ch_trna_collected ch_trna_combined -} \ No newline at end of file +} diff --git a/subworkflows/local/db_search.nf b/subworkflows/local/db_search.nf index e93b3498..ea65cd01 100644 --- a/subworkflows/local/db_search.nf +++ b/subworkflows/local/db_search.nf @@ -83,7 +83,7 @@ workflow DB_SEARCH { main: - DB_CHANNEL_SETUP( + DB_channel_SETUP( use_kegg, use_kofam, use_dbcan, @@ -125,23 +125,20 @@ workflow DB_SEARCH { if (!call) { - ch_called_proteins = Channel + ch_called_proteins = channel .fromPath(file(params.input_genes) / params.genes_fmt, checkIfExists: true) .ifEmpty { exit 1, "If you specify --annotate without --call, you must provide a fasta file of called genes using --input_genes. Cannot find any called gene fasta files matching: ${params.input_genes}\nNB: Path needs to follow pattern: path/to/directory/" } - .map { - input_fastaName = it.getBaseName() - tuple(input_fastaName, it) - } - + .map { it -> [ it.getBaseName(), it ] } + GENE_LOCS( ch_called_proteins) ch_gene_locs = GENE_LOCS.out.prodigal_locs_tsv n_fastas = file("$params.input_genes/${params.genes_fmt}").size() } - def formattedOutputChannels = channel.of() + def formattedOutputchannels = channel.of() // Here we will create mmseqs2 index files for each of the inputs if we are going to do a mmseqs2 database - if (DB_CHANNEL_SETUP.out.index_mmseqs) { + if (DB_channel_SETUP.out.index_mmseqs) { // Use MMSEQS2 to index each called genes protein file MMSEQS_INDEX( ch_called_proteins ) ch_mmseqs_query = MMSEQS_INDEX.out.mmseqs_index_out @@ -150,17 +147,17 @@ workflow DB_SEARCH { // KEGG annotation if (use_kegg) { ch_combined_query_locs_kegg = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_KEGG( ch_combined_query_locs_kegg, DB_CHANNEL_SETUP.out.ch_kegg_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, kegg_name ) + MMSEQS_SEARCH_KEGG( ch_combined_query_locs_kegg, DB_channel_SETUP.out.ch_kegg_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, kegg_name ) ch_kegg_unformatted = MMSEQS_SEARCH_KEGG.out.mmseqs_search_formatted_out SQL_KEGG(ch_kegg_unformatted, kegg_name, ch_sql_descriptions_db) ch_kegg_formatted = SQL_KEGG.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_kegg_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_kegg_formatted) } // KOFAM annotation if (use_kofam) { - HMM_SEARCH_KOFAM ( ch_called_proteins, params.kofam_e_value, DB_CHANNEL_SETUP.out.ch_kofam_db ) + HMM_SEARCH_KOFAM ( ch_called_proteins, params.kofam_e_value, DB_channel_SETUP.out.ch_kofam_db ) ch_kofam_hmms = HMM_SEARCH_KOFAM.out.hmm_search_out PARSE_HMM_KOFAM ( ch_kofam_hmms ) @@ -170,23 +167,23 @@ workflow DB_SEARCH { KOFAM_HMM_FORMATTER ( ch_combined_hits_locs_kofam, ch_kofam_list ) ch_kofam_formatted = KOFAM_HMM_FORMATTER.out.kofam_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_kofam_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_kofam_formatted) } // PFAM annotation if (use_pfam) { ch_combined_query_locs_pfam = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_PFAM( ch_combined_query_locs_pfam, DB_CHANNEL_SETUP.out.ch_pfam_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, pfam_name ) + MMSEQS_SEARCH_PFAM( ch_combined_query_locs_pfam, DB_channel_SETUP.out.ch_pfam_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, pfam_name ) ch_pfam_unformatted = MMSEQS_SEARCH_PFAM.out.mmseqs_search_formatted_out SQL_PFAM(ch_pfam_unformatted, pfam_name, ch_sql_descriptions_db) ch_pfam_formatted = SQL_PFAM.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_pfam_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_pfam_formatted) } // dbCAN annotation if (use_dbcan) { - HMM_SEARCH_DBCAN ( ch_called_proteins, params.dbcan_e_value , DB_CHANNEL_SETUP.out.ch_dbcan_db) + HMM_SEARCH_DBCAN ( ch_called_proteins, params.dbcan_e_value , DB_channel_SETUP.out.ch_dbcan_db) ch_dbcan_hmms = HMM_SEARCH_DBCAN.out.hmm_search_out PARSE_HMM_DBCAN ( ch_dbcan_hmms ) @@ -196,12 +193,12 @@ workflow DB_SEARCH { DBCAN_HMM_FORMATTER ( ch_combined_hits_locs_dbcan, dbcan_name, ch_sql_descriptions_db ) ch_dbcan_formatted = DBCAN_HMM_FORMATTER.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_dbcan_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_dbcan_formatted) } // CAMPER annotation if (use_camper) { // HMM - HMM_SEARCH_CAMPER ( ch_called_proteins, params.camper_e_value , DB_CHANNEL_SETUP.out.ch_camper_hmm_db) + HMM_SEARCH_CAMPER ( ch_called_proteins, params.camper_e_value , DB_channel_SETUP.out.ch_camper_hmm_db) ch_camper_hmms = HMM_SEARCH_CAMPER.out.hmm_search_out PARSE_HMM_CAMPER ( ch_camper_hmms ) @@ -211,18 +208,18 @@ workflow DB_SEARCH { CAMPER_HMM_FORMATTER ( ch_combined_hits_locs_camper, ch_camper_hmm_list ) ch_camper_hmm_formatted = CAMPER_HMM_FORMATTER.out.camper_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_camper_hmm_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_camper_hmm_formatted) // MMseqs ch_combined_query_locs_camper = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_CAMPER( ch_combined_query_locs_camper, DB_CHANNEL_SETUP.out.ch_camper_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, DB_CHANNEL_SETUP.out.ch_camper_mmseqs_list, camper_name ) + MMSEQS_SEARCH_CAMPER( ch_combined_query_locs_camper, DB_channel_SETUP.out.ch_camper_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, DB_channel_SETUP.out.ch_camper_mmseqs_list, camper_name ) ch_camper_mmseqs_formatted = MMSEQS_SEARCH_CAMPER.out.mmseqs_search_formatted_out - formattedOutputChannels = formattedOutputChannels.mix(ch_camper_mmseqs_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_camper_mmseqs_formatted) } // FeGenie annotation if (use_fegenie) { - HMM_SEARCH_FEGENIE ( ch_called_proteins, params.fegenie_e_value, DB_CHANNEL_SETUP.out.ch_fegenie_db ) + HMM_SEARCH_FEGENIE ( ch_called_proteins, params.fegenie_e_value, DB_channel_SETUP.out.ch_fegenie_db ) ch_fegenie_hmms = HMM_SEARCH_FEGENIE.out.hmm_search_out PARSE_HMM_FEGENIE ( ch_fegenie_hmms ) @@ -231,27 +228,27 @@ workflow DB_SEARCH { ch_combined_hits_locs_fegenie = ch_fegenie_parsed.join(ch_gene_locs) FEGENIE_HMM_FORMATTER ( ch_combined_hits_locs_fegenie ) ch_fegenie_formatted = FEGENIE_HMM_FORMATTER.out.fegenie_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_fegenie_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_fegenie_formatted) } // Methyl annotation if (use_methyl) { ch_combined_query_locs_methyl = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_METHYL( ch_combined_query_locs_methyl, DB_CHANNEL_SETUP.out.ch_methyl_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, methyl_name ) + MMSEQS_SEARCH_METHYL( ch_combined_query_locs_methyl, DB_channel_SETUP.out.ch_methyl_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, methyl_name ) ch_methyl_mmseqs_formatted = MMSEQS_SEARCH_METHYL.out.mmseqs_search_formatted_out - formattedOutputChannels = formattedOutputChannels.mix(ch_methyl_mmseqs_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_methyl_mmseqs_formatted) } // CANT-HYD annotation if (use_canthyd) { // MMseqs ch_combined_query_locs_canthyd = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_CANTHYD( ch_combined_query_locs_canthyd, DB_CHANNEL_SETUP.out.ch_canthyd_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, DB_CHANNEL_SETUP.out.ch_canthyd_mmseqs_list, canthyd_name ) + MMSEQS_SEARCH_CANTHYD( ch_combined_query_locs_canthyd, DB_channel_SETUP.out.ch_canthyd_mmseqs_db, params.bit_score_threshold, params.rbh_bit_score_threshold, DB_channel_SETUP.out.ch_canthyd_mmseqs_list, canthyd_name ) ch_canthyd_mmseqs_formatted = MMSEQS_SEARCH_CANTHYD.out.mmseqs_search_formatted_out - formattedOutputChannels = formattedOutputChannels.mix(ch_canthyd_mmseqs_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_canthyd_mmseqs_formatted) //HMM - HMM_SEARCH_CANTHYD ( ch_called_proteins, params.canthyd_e_value , DB_CHANNEL_SETUP.out.ch_canthyd_hmm_db) + HMM_SEARCH_CANTHYD ( ch_called_proteins, params.canthyd_e_value , DB_channel_SETUP.out.ch_canthyd_hmm_db) ch_canthyd_hmms = HMM_SEARCH_CANTHYD.out.hmm_search_out PARSE_HMM_CANTHYD ( ch_canthyd_hmms ) @@ -261,12 +258,12 @@ workflow DB_SEARCH { CANTHYD_HMM_FORMATTER ( ch_combined_hits_locs_canthyd, ch_canthyd_hmm_list ) ch_canthyd_hmm_formatted = CANTHYD_HMM_FORMATTER.out.canthyd_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_canthyd_hmm_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_canthyd_hmm_formatted) } // Sulfur annotation if (use_sulfur) { - HMM_SEARCH_SULFUR ( ch_called_proteins, params.sulfur_e_value, DB_CHANNEL_SETUP.out.ch_sulfur_db ) + HMM_SEARCH_SULFUR ( ch_called_proteins, params.sulfur_e_value, DB_channel_SETUP.out.ch_sulfur_db ) ch_sulfur_hmms = HMM_SEARCH_SULFUR.out.hmm_search_out PARSE_HMM_SULFUR ( ch_sulfur_hmms ) @@ -276,33 +273,33 @@ workflow DB_SEARCH { SULFUR_HMM_FORMATTER ( ch_combined_hits_locs_sulfur ) ch_sulfur_formatted = SULFUR_HMM_FORMATTER.out.sulfur_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_sulfur_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_sulfur_formatted) } // MEROPS annotation if (use_merops) { ch_combined_query_locs_merops = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_MEROPS( ch_combined_query_locs_merops, DB_CHANNEL_SETUP.out.ch_merops_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, merops_name ) + MMSEQS_SEARCH_MEROPS( ch_combined_query_locs_merops, DB_channel_SETUP.out.ch_merops_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, merops_name ) ch_merops_unformatted = MMSEQS_SEARCH_MEROPS.out.mmseqs_search_formatted_out SQL_MEROPS(ch_merops_unformatted, merops_name, ch_sql_descriptions_db) ch_merops_formatted = SQL_MEROPS.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_merops_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_merops_formatted) } // Uniref annotation if (use_uniref) { ch_combined_query_locs_uniref = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_UNIREF( ch_combined_query_locs_uniref, DB_CHANNEL_SETUP.out.ch_uniref_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, uniref_name ) + MMSEQS_SEARCH_UNIREF( ch_combined_query_locs_uniref, DB_channel_SETUP.out.ch_uniref_db, params.bit_score_threshold, params.rbh_bit_score_threshold, default_sheet, uniref_name ) ch_uniref_unformatted = MMSEQS_SEARCH_UNIREF.out.mmseqs_search_formatted_out SQL_UNIREF(ch_uniref_unformatted, uniref_name, ch_sql_descriptions_db) ch_uniref_formatted = SQL_UNIREF.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_uniref_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_uniref_formatted) } // VOGdb annotation if (use_vog) { - HMM_SEARCH_VOG ( ch_called_proteins, params.vog_e_value , DB_CHANNEL_SETUP.out.ch_vogdb_db ) + HMM_SEARCH_VOG ( ch_called_proteins, params.vog_e_value , DB_channel_SETUP.out.ch_vogdb_db ) ch_vog_hmms = HMM_SEARCH_VOG.out.hmm_search_out PARSE_HMM_VOG ( ch_vog_hmms ) @@ -312,21 +309,21 @@ workflow DB_SEARCH { VOG_HMM_FORMATTER ( ch_combined_hits_locs_vog, vogdb_name, ch_sql_descriptions_db ) ch_vog_formatted = VOG_HMM_FORMATTER.out.vog_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_vog_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_vog_formatted) } // Viral annotation if (params.use_viral) { ch_combined_query_locs_viral = ch_mmseqs_query.join(ch_gene_locs) - MMSEQS_SEARCH_VIRAL( ch_combined_query_locs_viral, DB_CHANNEL_SETUP.out.ch_viral_db, params.bit_score_threshold, params.rbh_bit_score_threshold,default_sheet, viral_name ) + MMSEQS_SEARCH_VIRAL( ch_combined_query_locs_viral, DB_channel_SETUP.out.ch_viral_db, params.bit_score_threshold, params.rbh_bit_score_threshold,default_sheet, viral_name ) ch_viral_unformatted = MMSEQS_SEARCH_VIRAL.out.mmseqs_search_formatted_out SQL_VIRAL(ch_viral_unformatted, viral_name, ch_sql_descriptions_db) ch_viral_formatted = SQL_VIRAL.out.sql_formatted_hits - formattedOutputChannels = formattedOutputChannels.mix(ch_viral_formatted) + formattedOutputchannels = formattedOutputchannels.mix(ch_viral_formatted) } - fastas = formattedOutputChannels.map { it[1] }.collect() - genes = ch_called_proteins.map { it[1] }.collect() + fastas = formattedOutputchannels.map { it -> it[1] }.collect() + genes = ch_called_proteins.map { it -> it[1] }.collect() COMBINE_ANNOTATIONS( fastas, genes ) ch_combined_annotations = COMBINE_ANNOTATIONS.out.combined_annotations_out @@ -337,7 +334,7 @@ workflow DB_SEARCH { } -workflow DB_CHANNEL_SETUP { +workflow DB_channel_SETUP { take: use_kegg use_kofam @@ -356,24 +353,24 @@ workflow DB_CHANNEL_SETUP { main: index_mmseqs = false - ch_kegg_db = Channel.empty() - ch_kofam_db = Channel.empty() - ch_dbcan_db = Channel.empty() - ch_camper_hmm_db = Channel.empty() - ch_camper_mmseqs_db = Channel.empty() - ch_camper_mmseqs_list = Channel.empty() - ch_merops_db = Channel.empty() - ch_pfam_mmseqs_db = Channel.empty() - ch_heme_db = Channel.empty() - ch_sulfur_db = Channel.empty() - ch_uniref_db = Channel.empty() - ch_methyl_db = Channel.empty() - ch_fegenie_db = Channel.empty() - ch_canthyd_hmm_db = Channel.empty() - ch_canthyd_mmseqs_db = Channel.empty() - ch_canthyd_mmseqs_list = Channel.empty() - ch_vogdb_db = Channel.empty() - ch_viral_db = Channel.empty() + ch_kegg_db = channel.empty() + ch_kofam_db = channel.empty() + ch_dbcan_db = channel.empty() + ch_camper_hmm_db = channel.empty() + ch_camper_mmseqs_db = channel.empty() + ch_camper_mmseqs_list = channel.empty() + ch_merops_db = channel.empty() + ch_pfam_mmseqs_db = channel.empty() + ch_heme_db = channel.empty() + ch_sulfur_db = channel.empty() + ch_uniref_db = channel.empty() + ch_methyl_db = channel.empty() + ch_fegenie_db = channel.empty() + ch_canthyd_hmm_db = channel.empty() + ch_canthyd_mmseqs_db = channel.empty() + ch_canthyd_mmseqs_list = channel.empty() + ch_vogdb_db = channel.empty() + ch_viral_db = channel.empty() if (use_kegg) { ch_kegg_db = file(params.kegg_db).exists() ? file(params.kegg_db) : error("Error: If using --annotate, you must supply prebuilt databases. KEGG database file not found at ${params.kegg_db}") diff --git a/subworkflows/local/merge.nf b/subworkflows/local/merge.nf index 8b4f7ece..a14fa940 100644 --- a/subworkflows/local/merge.nf +++ b/subworkflows/local/merge.nf @@ -31,10 +31,10 @@ workflow MERGE { } // Create a channel with the paths to the .tsv files - Channel - .from(tsv_files.collect { annotations_dir.toString() + '/' + it }) + channel + .from(tsv_files.collect { it -> annotations_dir.toString() + '/' + it }) .set { ch_merge_annotations } - Channel.empty() + channel.empty() .mix( ch_merge_annotations ) .collect() .set { ch_merge_annotations_collected } diff --git a/subworkflows/local/qc.nf b/subworkflows/local/qc.nf index 298c4fff..fe7a5bf8 100644 --- a/subworkflows/local/qc.nf +++ b/subworkflows/local/qc.nf @@ -25,39 +25,37 @@ workflow QC { // Add Bin Quality to annotations - if( params.bin_quality ){ + if ( params.bin_quality ) { ch_bin_quality = file(params.bin_quality) ADD_BIN_QUALITY( ch_combined_annotations, ch_bin_quality ) ch_updated_annots = ADD_BIN_QUALITY.out.annots_bin_quality_out } - else{ + else { ch_updated_annots = ch_combined_annotations } // Add Taxonomy to annotations - if( params.taxa ){ + if ( params.taxa ) { ch_taxa = file(params.taxa) ADD_TAXA( ch_updated_annots, ch_taxa ) ch_updated_taxa_annots = ADD_TAXA.out.annots_taxa_out } - else{ + else { ch_updated_taxa_annots = ch_combined_annotations } ch_final_annots = ch_updated_taxa_annots - if( params.generate_gff || params.generate_gbk ){ + if ( params.generate_gff || params.generate_gbk ) { if (!call) { - ch_called_genes = Channel + ch_called_genes = channel .fromPath(file(params.input_genes) / params.genes_fna_fmt, checkIfExists: true) .ifEmpty { exit 1, "If you specify --generate_gff or --generate_gbk without --call, you must provide a fasta file of called genes using --input_genes and --genes_fna_fmt,. Cannot find any called gene fasta files matching: ${params.input_genes} and ${params.genes_fna_fmt}\nNB: Path needs to follow pattern: path/to/directory/" } - .map { - input_fastaName = it.getBaseName() - tuple(input_fastaName, it) - } + .map { it -> [ it.getBaseName(), it ] } + // Collect all individual fasta to pass to quast - Channel.empty() + channel.empty() .mix( ch_called_genes ) .collect() .set { ch_collected_fna } diff --git a/workflows/dram.nf b/workflows/dram.nf index fd4a5fc2..1dd909a0 100644 --- a/workflows/dram.nf +++ b/workflows/dram.nf @@ -37,9 +37,9 @@ workflow DRAM { // Setup // - ch_versions = Channel.empty() - ch_multiqc_files = Channel.empty() - ch_fasta = Channel.empty() + ch_versions = channel.empty() + ch_multiqc_files = channel.empty() + ch_fasta = channel.empty() default_sheet = file(params.distill_dummy_sheet) distill_flag = (params.summarize || params.distill_topic != "" || params.distill_ecosystem != "" || params.distill_custom != "" || params.sum_ecos != "") @@ -61,14 +61,11 @@ workflow DRAM { if (params.rename || call) { - ch_fasta = Channel + ch_fasta = channel .fromPath(file(params.input_fasta) / params.fasta_fmt, checkIfExists: true) .ifEmpty { exit 1, "Cannot find any fasta files matching: ${params.input_fasta}\nNB: Path needs to follow pattern: path/to/directory/" } - ch_fasta = ch_fasta.map { - fasta_name = it.getBaseName() - tuple(fasta_name, it) - } + ch_fasta = ch_fasta.map { it -> [ it.getBaseName(), it ] } } use_kegg = params.use_kegg @@ -85,7 +82,7 @@ workflow DRAM { use_vog = params.use_vog if (params.anno_dbs != "") { - anno_dbs = params.anno_dbs.tokenize(',').collect { it.trim().toLowerCase() } + anno_dbs = params.anno_dbs.tokenize(',').collect { it -> it.trim().toLowerCase() } value_for_all = 'all' use_kegg = getDBFlag(anno_dbs, 'kegg', value_for_all) use_kofam = getDBFlag(anno_dbs, 'kofam', value_for_all) @@ -241,7 +238,7 @@ workflow DRAM { ch_final_annots = ADD_ANNOTATIONS.out.combined_annots_out } } else { // If the user has not specified --annotate, use the provided annotations - ch_final_annots = Channel + ch_final_annots = channel .fromPath(params.annotations, checkIfExists: true) .ifEmpty { exit 1, "If you specify --distill_ without --annotate, you must provide an annotations TSV file (--annotations ) with approprite formatting. Cannot find any called gene files matching: ${params.annotations}\nNB: Path needs to follow pattern: path/to/directory/" } } @@ -263,7 +260,7 @@ workflow DRAM { } else if (params.annotations) { - ch_final_annots = Channel + ch_final_annots = channel .fromPath(params.annotations, checkIfExists: true) .ifEmpty { exit 1, "Parameter annotations problem: Cannot find any called gene files matching: ${params.annotations}\nNB: Path needs to follow pattern: path/to/directory/" } } @@ -293,24 +290,24 @@ workflow DRAM { // // MODULE: MultiQC // - ch_multiqc_config = Channel.fromPath( + ch_multiqc_config = channel.fromPath( "$projectDir/assets/multiqc_config.yml", checkIfExists: true) ch_multiqc_custom_config = params.multiqc_config ? - Channel.fromPath(params.multiqc_config, checkIfExists: true) : - Channel.empty() + channel.fromPath(params.multiqc_config, checkIfExists: true) : + channel.empty() ch_multiqc_logo = params.multiqc_logo ? - Channel.fromPath(params.multiqc_logo, checkIfExists: true) : - Channel.empty() + channel.fromPath(params.multiqc_logo, checkIfExists: true) : + channel.empty() summary_params = paramsSummaryMap( workflow, parameters_schema: "nextflow_schema.json") - ch_workflow_summary = Channel.value(paramsSummaryMultiqc(summary_params)) + ch_workflow_summary = channel.value(paramsSummaryMultiqc(summary_params)) ch_multiqc_files = ch_multiqc_files.mix( ch_workflow_summary.collectFile(name: 'workflow_summary_mqc.yaml')) ch_multiqc_custom_methods_description = params.multiqc_methods_description ? file(params.multiqc_methods_description, checkIfExists: true) : file("$projectDir/assets/methods_description_template.yml", checkIfExists: true) - ch_methods_description = Channel.value( + ch_methods_description = channel.value( methodsDescriptionText(ch_multiqc_custom_methods_description)) ch_multiqc_files = ch_multiqc_files.mix(ch_collated_versions)