'Why is a conditional channel source causing a downstream process to not execute an instance for each value in a different channel?

I have a Nextflow DSL2 pipeline where an early process generally takes a very long time (~24 hours) and has intermediate products that occupy a lot of storage (~1 TB). Because of the length and resources required for this process, it would be desirable to be able to set a "checkpoint", i.e. save the (relatively small) final output to a safe location, and on subsequent pipeline executions retrieve the output from that location. This means that the intermediate data can be safely deleted without preventing resumption of the pipeline later.

However, I've found that when I implement this and use the checkpoint, a process further downstream that is supposed to run an instance for every value in a list only runs a single instance. Minimal working example and example outputs below:

// foobarbaz.nf
nextflow.enable.dsl=2

params.publish_dir = "$baseDir/output"
params.nofoo = false

xy = ['x', 'y']
xy_chan = Channel.fromList(xy)

process foo {
    publishDir "${params.publish_dir}/", mode: "copy"

    output:
        path "foo.out"
    
    """
    touch foo.out
    """
}

process bar {
    input:
        path foo_out

    output:
        path "bar.out"

    script:
        """
        touch bar.out
        """    
}

process baz {
    input:
        path bar_out
        val xy

    output:
        tuple val(xy), path("baz_${xy}.out")

    script:
        """
        touch baz_${xy}.out
        """
}

workflow {
    main:
        if( params.nofoo ) {
            foo_out = Channel.fromPath("${params.publish_dir}/foo.out")
        }
        else {
            foo_out = foo() // generally takes a long time and uses lots of storage
        }

        bar_out = bar(foo_out)
        baz_out = baz(bar_out, xy_chan)

        // ... continue to do things with baz_out ...
}

First execution with foo:

$ nextflow foobarbaz.nf 
N E X T F L O W  ~  version 21.10.6
Launching `foobarbaz.nf` [soggy_gautier] - revision: f4e70a5cd2
executor >  local (4)
[77/c65a9a] process > foo     [100%] 1 of 1 ✔
[23/846929] process > bar     [100%] 1 of 1 ✔
[18/1c4bb1] process > baz (2) [100%] 2 of 2 ✔

(note that baz successfully executes two instances: one where xy==x and one where xy==y)

Later execution using the checkpoint:

$ nextflow foobarbaz.nf --nofoo
N E X T F L O W  ~  version 21.10.6
Launching `foobarbaz.nf` [infallible_babbage] - revision: f4e70a5cd2
executor >  local (2)
[40/b42ed3] process > bar (1) [100%] 1 of 1 ✔
[d9/76888e] process > baz (1) [100%] 1 of 1 ✔

The checkpointing is successful (bar executes without needing foo), but now baz only executes a single instance where xy==x.

Why is this happening, and how can I get the intended behaviour? I see no reason why whether foo_out comes from foo or retrieved directly from a file should make any difference to how the xy channel is interpreted by baz.



Solution 1:[1]

The problem is that the Channel.fromPath factory method creates a queue channel to provide a single value, whereas the output of process 'foo' implicitly produces a value channel:

A value channel is implicitly created by a process when an input specifies a simple value in the from clause. Moreover, a value channel is also implicitly created as output for a process whose inputs are only value channels.

So without --nofoo, 'foo_out' and 'bar_out' are both value channels. Since, 'xy_chan' is a queue channel that provides two values, process 'bar' gets executed twice. With --nofoo, 'foo_out' and 'bar_out' are both queue channels which provide a single value. Since there's only one complete input configuration (i.e. one value from each input channel), process 'bar' gets executed only once. See also: Understand how multiple input channels work.

The solution is to ensure that 'foo_out' is either, always a queue channel or always value channel. Given your 'foo' process declaration, you probably want the latter:

if( params.nofoo ) {
    foo_out = file( "${params.publish_dir}/foo.out" )
}
else {
    foo_out = foo()
}

Solution 2:[2]

In my experience a process is executed according to the input channel with lowest N of emissions (which is one path emission from bar in your case). So in this case the strange behaviour is actually the example without --nofoo in my mind. If you want it executed 2 time you may try to combine the Channels using combine something like baz_input_ch=bar.out.combine(xy_chan)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Steve
Solution 2 Edoardo Giacopuzzi