'rx.net buffer subscribers called with zero elements on timespan expiration
With the below sequence, I am buffering into blocks every 3 seconds - In the real world use case, the observerable source could have many items within the buffer timespan (of 3 seconds below), or sometimes no items within that time frame.
In those cases of zero items I would not want the subscriber to be called.
var numbers = Observable
.Interval(TimeSpan.FromSeconds(4))
.Select(i => (int) i + 1)
.Do(i => Console.WriteLine($"Producing {i}"));
numbers.Buffer(TimeSpan.FromSeconds(3))
.Subscribe(buffer => Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));
With the output below, note the Buffer of 0 where subscribe has been called with zero items.
Buffer of 0 @ 19/05/2022 21:43:27
Producing 1
Buffer of 1 @ 19/05/2022 21:43:30
Producing 2
Buffer of 1 @ 19/05/2022 21:43:33
Buffer of 0 @ 19/05/2022 21:43:36
Producing 3
Buffer of 1 @ 19/05/2022 21:43:39
Producing 4
Buffer of 1 @ 19/05/2022 21:43:42
Producing 5
Buffer of 1 @ 19/05/2022 21:43:45
Buffer of 0 @ 19/05/2022 21:43:48
Producing 6
Buffer of 1 @ 19/05/2022 21:43:51
Producing 7
Buffer of 1 @ 19/05/2022 21:43:54
Producing 8
Buffer of 1 @ 19/05/2022 21:43:57
Buffer of 0 @ 19/05/2022 21:44:00
Producing 9
As a hack I could modify to ignore zero element sequences:
numbers.Buffer(TimeSpan.FromSeconds(3))
.Subscribe( buffer =>
{
if(buffer.Count == 0) return;
Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count);
});
Questions please:
Avoiding hacks, is there another operator that I could use (I am looking at
Windowbut unsure of its usage) so that we only call downstream subscriber methods when we have a block of data > 0 elements?What is the purpose of the potential for a zero length buffer to be pased?
How would one expand this example to grouping buffers by an identifer
GroupId, for an example sequenceObservable.Interval(timespan).Select(i => (GroupId: random.Next(0, 3), Value: (int) i + 1))?
Solution 1:[1]
You have to read the whole file (file:///path) and then apply a .where() filter.
This will also "push down" the filter to the I/O-level and read only the partitions that are required.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Til Piffl |
