'rx.net buffer subscribers called with zero elements on timespan expiration

With the below sequence, I am buffering into blocks every 3 seconds - In the real world use case, the observerable source could have many items within the buffer timespan (of 3 seconds below), or sometimes no items within that time frame.

In those cases of zero items I would not want the subscriber to be called.

var numbers = Observable
    .Interval(TimeSpan.FromSeconds(4))
    .Select(i => (int) i + 1)
    .Do(i => Console.WriteLine($"Producing {i}"));

numbers.Buffer(TimeSpan.FromSeconds(3))
    .Subscribe(buffer => Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));

With the output below, note the Buffer of 0 where subscribe has been called with zero items.

Buffer of 0 @ 19/05/2022 21:43:27
Producing 1
Buffer of 1 @ 19/05/2022 21:43:30
Producing 2
Buffer of 1 @ 19/05/2022 21:43:33
Buffer of 0 @ 19/05/2022 21:43:36
Producing 3
Buffer of 1 @ 19/05/2022 21:43:39
Producing 4
Buffer of 1 @ 19/05/2022 21:43:42
Producing 5
Buffer of 1 @ 19/05/2022 21:43:45
Buffer of 0 @ 19/05/2022 21:43:48
Producing 6
Buffer of 1 @ 19/05/2022 21:43:51
Producing 7
Buffer of 1 @ 19/05/2022 21:43:54
Producing 8
Buffer of 1 @ 19/05/2022 21:43:57
Buffer of 0 @ 19/05/2022 21:44:00
Producing 9

As a hack I could modify to ignore zero element sequences:

numbers.Buffer(TimeSpan.FromSeconds(3))
    .Subscribe( buffer =>
        {
             if(buffer.Count == 0) return;
             Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count);
        });

Questions please:

  1. Avoiding hacks, is there another operator that I could use (I am looking at Window but unsure of its usage) so that we only call downstream subscriber methods when we have a block of data > 0 elements?

  2. What is the purpose of the potential for a zero length buffer to be pased?

  3. How would one expand this example to grouping buffers by an identifer GroupId, for an example sequence Observable.Interval(timespan).Select(i => (GroupId: random.Next(0, 3), Value: (int) i + 1))?



Solution 1:[1]

You have to read the whole file (file:///path) and then apply a .where() filter.

This will also "push down" the filter to the I/O-level and read only the partitions that are required.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Til Piffl