'TPL DataFlow not behaving as expected

Consider the following pipeline:

public class Program
{
    private static int firstBlockactualDOP = 0;
    private static int lastBlockactualDOP = 0;
    private static ActionBlock<int> lastBlock;
    private static TransformBlock<int, int> firstBlock;
    private static System.Timers.Timer timer;
    public static void Main()
    {
        Do(-1, 32, -1, 100).GetAwaiter().GetResult();
    }

    public static async Task Do(int firstBlockBc, int firstBlockDOP, int secondBlockBc, int secondBlockDOP)
    {
        timer = new System.Timers.Timer
        {
            AutoReset = true,
            Enabled = true,
            Interval = 1000
        };

        firstBlock = new TransformBlock<int, int>(s => 
        {
            Interlocked.Increment(ref firstBlockactualDOP);
            Thread.Sleep(50);
            Interlocked.Decrement(ref firstBlockactualDOP);
            return s;
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = firstBlockDOP, BoundedCapacity = firstBlockBc });
        
        lastBlock = new ActionBlock<int>(s =>
        {
            Interlocked.Increment(ref lastBlockactualDOP);
            Thread.Sleep(2);
            Interlocked.Decrement(ref lastBlockactualDOP);
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = secondBlockDOP, BoundedCapacity = secondBlockBc });
        
        timer.Elapsed += MonitorEvent;

        firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
        var stopwatch = Stopwatch.StartNew();
        for (int i = 0; i < 10000; i++)
        {
            if (i % 32 == 0)
            {
             //   Thread.Sleep(200);
            }
            await firstBlock.SendAsync(i);
        }   
        

        firstBlock.Complete();
        await lastBlock.Completion;
        stopwatch.Stop();
        Console.WriteLine($"Total seconds: {stopwatch.Elapsed.TotalSeconds}.");
    }

    private static void MonitorEvent(object _, System.Timers.ElapsedEventArgs __)
    {
        Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} firstBlock: {{Input: {firstBlock.InputCount}, DOP: {firstBlockactualDOP}, Output: {firstBlock.OutputCount}}}" +
            $" lastBlock: {{Input: {lastBlock.InputCount}, DOP: {lastBlockactualDOP}}}");
    }
}

I was expected to see "lastBlock" running as soon as "firstBlock" finished processing his first element.

This is the observed output I got:

45:39.707 firstBlock: {Input: 9728, DOP: 16, Output: 256} lastBlock: {Input: 0, DOP: 0}
45:40.718 firstBlock: {Input: 9455, DOP: 17, Output: 528} lastBlock: {Input: 0, DOP: 0}
45:42.716 firstBlock: {Input: 8861, DOP: 19, Output: 1120} lastBlock: {Input: 0, DOP: 0}
45:44.705 firstBlock: {Input: 8203, DOP: 21, Output: 1776} lastBlock: {Input: 0, DOP: 0}
45:46.717 firstBlock: {Input: 7481, DOP: 23, Output: 2496} lastBlock: {Input: 0, DOP: 0}
45:48.714 firstBlock: {Input: 6695, DOP: 25, Output: 3280} lastBlock: {Input: 0, DOP: 0}
45:50.713 firstBlock: {Input: 5845, DOP: 27, Output: 4128} lastBlock: {Input: 0, DOP: 0}
45:52.714 firstBlock: {Input: 4936, DOP: 29, Output: 5035} lastBlock: {Input: 0, DOP: 0}
45:53.718 firstBlock: {Input: 4455, DOP: 30, Output: 5515} lastBlock: {Input: 0, DOP: 0}
45:55.715 firstBlock: {Input: 3454, DOP: 32, Output: 6514} lastBlock: {Input: 0, DOP: 0}
45:57.716 firstBlock: {Input: 2448, DOP: 32, Output: 512} lastBlock: {Input: 6815, DOP: 2}
45:59.708 firstBlock: {Input: 1424, DOP: 32, Output: 1024} lastBlock: {Input: 6880, DOP: 4}
46:00.715 firstBlock: {Input: 912, DOP: 32, Output: 512} lastBlock: {Input: 7583, DOP: 5}
46:02.514 firstBlock: {Input: 0, DOP: 30, Output: 407} lastBlock: {Input: 7865, DOP: 7}
46:04.206 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 4076, DOP: 40}
46:05.211 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 1451, DOP: 41}
Total seconds: 27.0824484.

Uncommenting //Thread.Sleep(200); "released" the lastBlock and it got it to work. Also, changing the method call to Do(64,32, -1, 100); Made it also play together much better

My question is how I can really balance the work between the 2 blocks I have



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source