'How does the TestScheduler in Reactive Extensions v2 work

I have been reading this: Testing Rx Queries using Virtual Time Scheduling

I got to the part "Using Unit Test projects" (about halfway down the page) and tried it myself (using VS2012 & MSTest) but my results are not the same as in the document. Specifically, my assertions fail.

Here is my test code:

[TestMethod]
public void TestMethod1()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateColdObservable(
        OnNext(10, 42),
        OnCompleted<int>(20));

    var observer1 = scheduler.CreateObserver<int>();
    scheduler.Schedule(TimeSpan.FromTicks(190), 
                       (s, t) => xs.Subscribe(observer1));

    var observer2 = scheduler.CreateObserver<int>();
    scheduler.Schedule(TimeSpan.FromTicks(220), 
                       (s, t) => xs.Subscribe(observer2));

    scheduler.Start();

    observer1.Messages.AssertEqual(
        OnNext(200, 42),
        OnCompleted<int>(210));

     observer2.Messages.AssertEqual(
        OnNext(230, 42),
        OnCompleted<int>(240));
}

One change I had to make was this:

scheduler.Schedule(TimeSpan.FromTicks(190),
                   () => xs.Subscribe(observer1));

became:

scheduler.Schedule(TimeSpan.FromTicks(190), 
                   (s, t) => xs.Subscribe(observer1));

To be perfectly honest I don't grasp the impact of this change (I haven't been coding in C# that long!).

And here are the results:

Assert.Fail failed. 
Expected: [OnNext(42)@200, OnCompleted()@210]
Actual..: [OnNext(42)@11, OnCompleted()@21]

The way I understand it is that I am creating a cold observable that will return the value 42 on the 10th tick and the sequence completes on the 20th tick. I then create and observer that subscribes to this observable indicating that I want the schedule to start at tick 190. Hence the test for value 42 at tick 200 (190 + 10) and the completion time at tick 210. I however get ticks 11 and 21 respectively.

The document is based on a pre-RTM of V2 so I am not sure if the behavior of the TestScheduler has been changed in the RTM or whether I made a mistake somewhere. I'd appreciate if someone could explain what is happening.



Solution 1:[1]

It seems you've forgotten to import the System.Reactive.Concurrency namespace where the Schedule overload is defined you want to use. This explains why you have to change the code - the convenience extension methods likely didn't show up:

using System.Reactive.Concurrency;

The overload of Schedule you're using is the one that's used for immediate scheduling, passing the TimeSpan value as the TState parameter:

public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)

Hence, the subscriptions to xs will be carried out immediately when the scheduler starts at time 0. Each work item increments the clock (no events can happen concurrently on the test scheduler, which mimics a single thread of execution), hence you're seeing one sequence subscription at time 1 and another at time 2.

One possible fix is to use the following:

var observer1 = scheduler.CreateObserver<int>();
scheduler.ScheduleAbsolute(190, () => xs.Subscribe(observer1));

var observer2 = scheduler.CreateObserver<int>();
scheduler.ScheduleAbsolute(220, () => xs.Subscribe(observer2));

Make sure to import the System.Reactive.Concurrency namespace to find those overloads. You could use ScheduleRelative as well here, because the scheduler hasn't started yet, the due time will be the current scheduler's Clock value (= 0) plus the specified relative time duration.

Alternatively, you can go through the IScheduler API surface as well, using DateTimeOffset or TimeSpan overloads. Just be careful to pick the right overload (use F12 - go to definition - to see which overload you've picked).

var observer1 = scheduler.CreateObserver<int>();
scheduler.Schedule(TimeSpan.FromTicks(190), () => xs.Subscribe(observer1));

var observer2 = scheduler.CreateObserver<int>();
scheduler.Schedule(TimeSpan.FromTicks(220), () => xs.Subscribe(observer2));

Again, make sure to import the System.Reactive.Concurrency namespace with a using directive in order to see this overload :-).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Bart De Smet