'Conditional delay+throttle operator
I'm writing a custom RX operator that combines features from both Throttle and Delay, with the following signature
public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);
The rules are as follows:
- If
condition(t)returnsfalse, emit immediately. - If
condition(t)returnstrue, delay fordelaytime. - If
selfemits a value during a delay then do the following:- If
condition(t)returnsfalse, cancel/skip the value scheduled for delayed emission and emit the new value - If
condition(t)returnstruethen skip/ignore this new value (i.e. the delayed value will emit ifselfdoes not emit any more values during the delay).
- If
As you can tell from the rules, there is some behavior reminiscent of throttling going on here.
My various attempts at solving this issue include some async approaches that just grew to complex. I really feel this should be solvable using existing operators. E.g. see https://stackoverflow.com/a/16290788/2149075, which uses Amb quite neatly and which I feel is really close to what I want to achieve.
Solution 1:[1]
Here is an implementation of the DelayWhen operator, that is based on the built-in Window operator:
Update: My original implementation (Revision 1) did not satisfy the requirements of the question, so I changed it by replacing the Delay operator with a custom-made delaying/throttling operator.
/// <summary>
/// Either delays the emission of the elements that satisfy the condition, by the
/// specified time duration, or ignores them, in case they are produced before
/// the emission of previously delayed element. Elements that don't satisfy the
/// condition are emitted immediately, and they also cancel any pending emission of
/// all previously delayed elements.
/// </summary>
public static IObservable<T> DelayWhen<T>(this IObservable<T> source,
TimeSpan delay, Func<T, bool> condition, IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published.Window(published.Where(e => !e.WithDelay)))
.Select(w => Observable.Merge(
DelayThrottleSpecial(w.Where(e => e.WithDelay), delay, scheduler),
w.Where(e => !e.WithDelay)
))
.Switch()
.Select(e => e.Item);
/// <summary>
/// Time shifts the observable sequence by the specified time duration, ignoring
/// elements that are produced while a previous element is scheduled for emission.
/// </summary>
static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,
TimeSpan dueTime, IScheduler scheduler)
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(x =>
{
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return Observable.Return(x)
.DelaySubscription(dueTime, scheduler)
.Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<T2>();
});
}
}
The source sequence is partitioned in consecutive windows (subsequences), with each window ending with a false (non delayed) element. Each window is then projected to a new window that has its true (delayed) elements delayed/throttled according to the requirements. Finally the projected windows are merged back to a single sequence by using the Switch operator, so that all pending elements of a window are discarded every time a new window is emitted.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
