Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions adr/20250825-workflow-params.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ This approach has several limitations:

- Coerce CLI parameter values based on declared types, rather than relying on heuristics.

- Support collection-type parameters that can be loaded from structured files (CSV, JSON, YAML).

## Non-goals

- Removing the legacy `params.foo = bar` syntax -- legacy parameters must continue to work without modification.
Expand Down Expand Up @@ -103,6 +105,42 @@ When a parameter is supplied on the command line, Nextflow converts the string v

This replaces the heuristic type detection used for legacy parameters.

### Samplesheets as collection-type parameters

A parameter with a collection type (`List`, `Set`, `Bag`) can be supplied as a file path. Nextflow parses the file and assigns the resulting collection to the parameter. Supported formats are CSV, JSON, and YAML:

```groovy
params {
samples: List<Sample> // can be supplied as a CSV, JSON, or YAML file path
}

record Sample {
id: String
fastq_1: Path
fastq_2: Path
}
```

The file contents must be compatible with the declared element type; an error is thrown if they are not. CSV files must include a header row and use a comma as the column separator.

The collection-type parameter can use a generic type such as `Map` or `Record`, or a custom record type to enable further validation. In the above example, using the `Sample` type ensures that each samplesheet row is validated against the record fields and the `fastq_1` and `fastq_2` columns are treated as file paths.

This feature allows collection-type parameters to serve as *samplesheet inputs*, which simplifies the workflow logic and allows it to be agnostic to the input format:

```groovy
// before (CSV only)
ch_samples = channel.fromPath(param.samples)
.flatMap { csv ->
csv.splitCsv(header: true, sep: ',')
}
.map { r ->
record(id: r.id, fastq_1: file(r.fastq_1), fastq_2: file(r.fastq_2))
}

// after (CSV, JSON, or YAML)
ch_samples = channel.fromList(param.samples)
```

### Compile-time validation

Legacy parameters can be accessed globally by all scripts in the pipeline. While this approach is flexible, it prevents compile-time validation and breaks modularity.
Expand Down Expand Up @@ -227,3 +265,80 @@ For example, when loading a JSON file as a collection of records, Nextflow uses
- If a JSON object is missing a record field that is marked as nullable, it is considered valid

While type annotations are used only at compile-time in all other contexts, they are needed at runtime for pipeline parameters in order to validate and convert external input data to the expected type.

### Standard library functions for loading structured files

The automatic loading of samplesheet inputs is supported only in the `params` block. It could also be useful to load structured files with functions. For example:

```groovy
samples = fromJson('samples.json')
```

Where `fromJson` is a function that loads arbitrary data from a JSON file.

A function is more flexible because it can be used anywhere in pipeline code, whereas the automatic samplesheet loading can only be used for pipeline-level inputs. For example, a process might produce a JSON file that needs to be read in workflow logic and processed by downstream tasks in parallel.

However, data-loading functions like `fromJson` cannot be statically typed, since the data file could contain anything (number, string, list, map, etc). Primitive values can be coerced using a Groovy-style cast (e.g. `fromJson('...') as Map`), but this approach does not support parameterized types or Nextflow record types. A function like `fromJson` also assumes a specific file format, which is overly restrictive for a pipeline-level input that could be supplied in a variety of formats.

Loading structured files with a typed parameter such as `samples: List<Sample>` allows the input to have a well-defined type at compile-time and allows it to be sourced from any data format. While currently only CSV, JSON, and YAML are supported, the format can be made extensible in the future so that users can integrate their own data formats (e.g. Parquet) via plugins. Data-loading functions like `fromJson` can also be implemented for other use cases in the future, but they are not sufficient for handling pipeline-level inputs.

### Typed parameters and schemas

While this ADR does not specify any native integration with JSON schema, it is worth addressing how typed parameters are expected to interact with schemas in the future.

A common approach for Nextflow pipelines is to define a JSON schema for the pipeline parameters. A samplesheet param is typically defined as a file input with its own *samplesheet schema*. Developers typically load and validate samplesheet params using the `samplesheetToList` function from the `nf-schema` plugin:

```groovy
include { samplesheetToList } from 'plugin/nf-schema'

params.input = null

workflow {
samples = samplesheetToList(params.input, "assets/schema_input.json")
channel.fromList(samples).view()
}
```

The parameter schema provides similar validation and type coercion as a typed parameter. For example, the following record type:

```groovy
record Sample {
id: String
fastq_1: Path
fastq_2: Path?
}
```

Is equivalent to the following JSON schema:

```json
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "array",
"items": {
"type": "object",
"properties": {
"id": { "type": "string" },
"fastq_1": { "type": "string", "format": "file-path", "exists": true },
"fastq_2": { "type": "string", "format": "file-path", "exists": true }
},
"required": ["id", "fastq_1"]
}
}
```

The `samplesheetToList` function has the same limitation as `fromJson` described above -- it is not statically typed. Even with the schema file, the type checker cannot guarantee a specific return type at compile-time because there is no type annotation. The above example must be rewritten as follows in order to be statically typed:

```groovy
params {
input: List<Sample>
}

workflow {
channel.fromList(params.input).view()
}
```

On the other hand, the samplesheet schema can specify additional validations that cannot be expressed with Nextflow types, such as min/max constraints for numbers and pattern constraints for strings.

It is not yet clear whether it is better for Nextflow to enforce these schema properties at runtime, or for users to implement the equivalent validation in their pipeline code. If needed, Nextflow should be able to augment the `params` block with the parameter schema to provide this extra validation. The above example would work without any additional code changes.
66 changes: 66 additions & 0 deletions adr/20251020-workflow-outputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,72 @@ While this approach is less verbose, it breaks the modularity of processes and s

On the other hand, propagating all workflow outputs to the top will make pipelines more verbose, especially when using "skinny tuple" channels. This issue will be alleviated by migrating from tuples to records -- for this reason, it is recommended that large pipelines be migrated to records before being migrated to workflow outputs.

### Inferring params and outputs from a named workflow

Consider the following entry workflow which simply wraps a named workflow:

```groovy
params {
samples: List<Sample>
index: Path
}

workflow {
main:
ch_samples = channel.fromList(samples)
rnaseq = RNASEQ(ch_samples, index)

publish:
aligned = rnaseq.aligned
multiqc_report = rnaseq.multiqc_report
}

output {
aligned: Channel<AlignedSample> {
path { s -> /* ... */ }
index { path 'aligned.json' }
}
multiqc_report: Path {
path '.'
}
}
```

Where the `RNASEQ` workflow is defined as follows:

```groovy
workflow RNASEQ {
take:
ch_samples: Channel<Sample>
index: Path

main:
ch_aligned = ALIGN(ch_samples, index)
multiqc_report = MULTIQC(ch_aligned.collect())

emit:
aligned: Channel<AlignedSample> = ch_aligned
multiqc_report: Path = multiqc_report
}

record Sample { /* ... */ }
record AlignedSample { /* ... */ }
```

This example demonstrates that most of the `params` / `workflow` / `output` trio can be equivalently expressed by a named workflow: the `params` block mirrors the `take:` section, and the `output` block and `publish:` section together mirror the `emit:` section.

Named workflows typically consume and produce channels so that they can be composed into larger pipelines. But this prevents them from being directly executable -- the purpose of an entry workflow is to translate between dataflow logic and the external world. If this translation could be inferred automatically, it would allow a named workflow to be both executable and composable, eliminating the need to define explicit entry workflows.

Given a named workflow with dataflow inputs and outputs, the following capabilities would be needed to execute it directly:

- Loading an input channel (e.g. channel of records) from an index file (e.g. CSV, JSON, or YAML file)
- Saving an output channel (e.g. channel of records) as an index file
- Publishing output files to a permanent location

Channels can be automatically translated to/from index files using record types. However, the output directory structure cannot be automatically inferred. It is normally specified by the output `path` directive, and need not correspond at all to the structure of output channels.

One solution is to not create an output directory at all. The workflow outputs provide a structured view of the output files, so this can be used by an external system (e.g. Seqera Platform) to provide a user interface. The output files can simply remain where they are produced, instead of being copied to a separate location. The work directory will likely need to be a global persistent data store, which implies global caching, automatic cleanup, and global search.

## Links

- Community issues: [#4042](https://github.com/nextflow-io/nextflow/issues/4042), [#4661](https://github.com/nextflow-io/nextflow/issues/4661), [#4670](https://github.com/nextflow-io/nextflow/issues/4670)
Expand Down
69 changes: 46 additions & 23 deletions docs/tutorials/static-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,47 @@ read_pairs_ch = channel.of(params.reads)
}
```

You can simplify the code further by modeling `params.reads` as a collection of records instead of a file path.

Add a header row to the samplesheet:

```
id,fastq_1,fastq_2
gut,...
liver,...
lung,...
spleen,...
```

Refactor `params.reads` as a collection of records:

```nextflow
params {
// The input samplesheet of paired-end reads
reads: List<Sample> = "${projectDir}/data/allreads.csv"

// ...
}

record Sample {
id: String
fastq_1: Path
fastq_2: Path
}
```

In the above, `Sample` is a *record type* based on the samplesheet structure. When a file path is supplied to a collection-type parameter (e.g., `List<Sample>`), the file path is automatically loaded and parsed into a collection.

Refactor the `read_pairs_ch` to load the collection into a channel:

```nextflow
read_pairs_ch = channel.fromList(params.reads)
```

:::{note}
Collection-type params can also be loaded from JSON and YAML samplesheets. See {ref}`workflow-typed-params` for more information.
:::

### Migrating processes

See {ref}`process-typed-page` for an overview of typed processes.
Expand Down Expand Up @@ -393,11 +434,7 @@ You can infer the type of each workflow input by examining how the workflow is c

```nextflow
workflow {
read_pairs_ch = channel.of(params.reads)
.flatMap { csv -> csv.splitCsv() }
.map { row ->
record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2]))
}
read_pairs_ch = channel.fromList(params.reads)

RNASEQ(read_pairs_ch, params.transcriptome)

Expand All @@ -407,7 +444,7 @@ workflow {

You can determine the type of each input as follows:

- The channel `read_pairs_ch` has type `Channel<Record>`, where each record contains the fields `id`, `fastq_1`, `fastq_2`.
- The channel `read_pairs_ch` has type `Channel<E>`, where `E` is the type of each value in the channel. It is loaded from `params.reads` which has type `List<Sample>`. Therefore `read_pairs_ch` has type `Channel<Sample>`.

- The parameter `params.transcriptome` has type `Path` as defined in the `params` block.

Expand All @@ -423,12 +460,6 @@ workflow RNASEQ {

// ...
}

record Sample {
id: String
fastq_1: Path
fastq_2: Path
}
```

The `read_pairs_ch` channel also needs to provide all of the record fields required by downstream processes. It is used by `FASTQC` and `QUANT`, which both declare the following record input:
Expand Down Expand Up @@ -518,11 +549,7 @@ The entry workflow is defined as follows:

```nextflow
workflow {
read_pairs_ch = channel.of(params.reads)
.flatMap { csv -> csv.splitCsv() }
.map { row ->
record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2]))
}
read_pairs_ch = channel.fromFilePairs(params.reads, checkIfExists: true, flat: true)

(fastqc_ch, quant_ch) = RNASEQ(read_pairs_ch, params.transcriptome)

Expand All @@ -538,11 +565,7 @@ Rewrite this workflow based on the updated params, processes, and subworkflows:
nextflow.enable.types = true

workflow {
read_pairs_ch = channel.of(params.reads)
.flatMap { csv -> csv.splitCsv() }
.map { row ->
record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2]))
}
read_pairs_ch = channel.fromList(params.reads)

samples_ch = RNASEQ(read_pairs_ch, params.transcriptome)

Expand All @@ -554,7 +577,7 @@ workflow {
}
```

The `reads` param was refactored as a `Path`, so it is loaded into a channel of records using `splitCsv`. It is compatible with the records expected by `RNASEQ`.
The `reads` param was refactored as a collection of records, so it is loaded into a channel using `channel.fromList`. It is compatible with the records expected by `RNASEQ`.

The `RNASEQ` workflow now returns a single combined channel, so the `mix` operation is no longer needed. The `flatMap` operator is used to extract the files from each record in `samples_ch`.

Expand Down
30 changes: 30 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/script/ParamsDsl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package nextflow.script
import java.lang.reflect.Type
import java.nio.file.Path

import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovy.yaml.YamlSlurper
import nextflow.Session
import nextflow.exception.ScriptRuntimeException
import nextflow.file.FileHelper
import nextflow.script.dsl.Types
import nextflow.splitter.CsvSplitter
import nextflow.util.Duration
import nextflow.util.MemoryUnit
import nextflow.util.TypeHelper
Expand Down Expand Up @@ -135,6 +139,10 @@ class ParamsDsl {
return MemoryUnit.of(str)
}

if( TypeHelper.isCollectionType(decl.type) ) {
return resolveFromFile(decl, FileHelper.asPath(str))
}

if( decl.type == Path ) {
return TypeHelper.asPathType(str)
}
Expand All @@ -154,6 +162,9 @@ class ParamsDsl {

final str = value.toString()

if( TypeHelper.isCollectionType(decl.type) )
return resolveFromFile(decl, FileHelper.asPath(str))

if( decl.type == Path )
return TypeHelper.asPathType(str)

Expand All @@ -170,6 +181,25 @@ class ParamsDsl {
}
}

private static Object resolveFromFile(Param decl, Path file) {
final ext = file.getExtension()
final value = switch( ext ) {
case 'csv' -> new CsvSplitter().options(header: true, sep: ',').target(file).list()
case 'json' -> new JsonSlurper().parse(file)
case 'yaml' -> new YamlSlurper().parse(file)
case 'yml' -> new YamlSlurper().parse(file)
default -> throw new ScriptRuntimeException("Unrecognized file format '${ext}' for input file '${file}' supplied for parameter `${decl.name}` -- should be CSV, JSON, or YAML")
}

try {
return TypeHelper.asCollectionType(value as Collection, decl.type)
}
catch( GroovyCastException | UnsupportedOperationException e ) {
final actualType = value.getClass()
throw new ScriptRuntimeException("Parameter `${decl.name}` with type ${Types.getName(decl.type)} cannot be assigned to contents of '${file}' [${Types.getName(actualType)}]")
}
}

private static boolean isAssignableFrom(Class target, Class source) {
if( target == Float.class )
return Number.class.isAssignableFrom(source)
Expand Down
Loading
Loading