'Using "timed()" to find out how much time a Reactor operator is taking
I'm sure this is just a great opportunity for me to learn more - I'm looking forward to the answers - thanks in advance!
I've been playing with timed() to try and determine which part of my reactive flow is taking a lot of time (not sure if this is the best way or whether to write my own subscriber - but I always look for existing solutions from really bright people first).
I am seeing some really interesting things. Here's a simple flow:
Flux.range(0, 10)
.log("range")
.map(String::valueOf)
.timed()
.log("map")
.map(t -> "value-" + t.get())
.timed()
.log("concat")
.subscribe();
If I understand this correctly, I expect that the "map" log would show
- the elapsed time between subscription to onNext for the first item
- the elapsed time between onNext(s) for subsequent items
Then, the "concat" log would show the same for the 2nd map operator, which should include the amount of time taken to perform the 1st map operation (plus some).
The scenario doesn't involve threading (although I admit that it could and would be ok by the specification), and the logs seem to indicate that there is a linear flow of
range->map->map for onNext operations.
There is variability in the fact that the entire flow must be completed - upstream and downstream of an operator I'm timing - before the next onNext() is invoked - so the multiple timed() such as I am doing here isn't really great - and is probably not a good fit for trying to find expensive "sub flows". Where there is no loop - such as when dealing with a web request flow, timed() is useful.
I can't use general metrics to troubleshoot the real issue because I need to see the behavior for a particular condition (I need a trace-span that triggers on a predicate for a flow where each operator has its own span).
But with this particular solution, I'm seeing something I didn't expect.
Looking at the logs, it seems that there are some odd durations. It appears that every other (or something like that) onNext(), the "concat" operation is shorter in duration than the "map" operation. See the log entries for values 1, 3, 6 ...
The Scheduler used to obtain the nanotime doesn't change. The subscribers seem to be behaving as expected. I must be overlooking something and there's something cool to learn about.
Looking at FluxTimed and FluxMapFuseable isn't yielding the answer to me right away (but I will keep looking). Or is it just as simple as the fact that sometimes "circling around" to the next range->map is actually quite a bit faster than other times?
Any ideas? Other than that oddity, everything else seems to be working as expected.
The logs from the provided test (supplemented with some additional logging I added in FluxTimed) are:
{"@timestamp":"2022-01-13T20:33:43.61Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.63Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.63Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.631Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.631Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.632Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.632Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":0,"message":"| onNext(0)"}
Current thread is: Thread[main,5,main] for value 0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684449908952 for value 0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684454327472 for value 0
{"@timestamp":"2022-01-13T20:33:43.635Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(0){eventElapsedNanos=4418520, eventElapsedSinceSubscriptionNanos=4418520, eventTimestampEpochMillis=1642106023635})"}
Current thread is: Thread[main,5,main] for value value-0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684450236435 for value value-0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684469141337 for value value-0
{"@timestamp":"2022-01-13T20:33:43.649Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-0){eventElapsedNanos=18904902, eventElapsedSinceSubscriptionNanos=18904902, eventTimestampEpochMillis=1642106023649})"}
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":1,"message":"| onNext(1)"}
Current thread is: Thread[main,5,main] for value 1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684454327472 for value 1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684469639473 for value 1
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(1){eventElapsedNanos=15312001, eventElapsedSinceSubscriptionNanos=19730521, eventTimestampEpochMillis=1642106023650})"}
Current thread is: Thread[main,5,main] for value value-1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684469141337 for value value-1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684469857807 for value value-1
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-1){eventElapsedNanos=716470, eventElapsedSinceSubscriptionNanos=19621372, eventTimestampEpochMillis=1642106023650})"}
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":2,"message":"| onNext(2)"}
Current thread is: Thread[main,5,main] for value 2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684469639473 for value 2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684470234342 for value 2
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(2){eventElapsedNanos=594869, eventElapsedSinceSubscriptionNanos=20325390, eventTimestampEpochMillis=1642106023650})"}
Current thread is: Thread[main,5,main] for value value-2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684469857807 for value value-2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684470457218 for value value-2
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-2){eventElapsedNanos=599411, eventElapsedSinceSubscriptionNanos=20220783, eventTimestampEpochMillis=1642106023651})"}
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":3,"message":"| onNext(3)"}
Current thread is: Thread[main,5,main] for value 3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684470234342 for value 3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684470833983 for value 3
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(3){eventElapsedNanos=599641, eventElapsedSinceSubscriptionNanos=20925031, eventTimestampEpochMillis=1642106023651})"}
Current thread is: Thread[main,5,main] for value value-3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684470457218 for value value-3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684471033203 for value value-3
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-3){eventElapsedNanos=575985, eventElapsedSinceSubscriptionNanos=20796768, eventTimestampEpochMillis=1642106023651})"}
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":4,"message":"| onNext(4)"}
Current thread is: Thread[main,5,main] for value 4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684470833983 for value 4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684471372015 for value 4
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(4){eventElapsedNanos=538032, eventElapsedSinceSubscriptionNanos=21463063, eventTimestampEpochMillis=1642106023652})"}
Current thread is: Thread[main,5,main] for value value-4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684471033203 for value value-4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684471576037 for value value-4
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-4){eventElapsedNanos=542834, eventElapsedSinceSubscriptionNanos=21339602, eventTimestampEpochMillis=1642106023652})"}
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":5,"message":"| onNext(5)"}
Current thread is: Thread[main,5,main] for value 5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684471372015 for value 5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684471912756 for value 5
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(5){eventElapsedNanos=540741, eventElapsedSinceSubscriptionNanos=22003804, eventTimestampEpochMillis=1642106023652})"}
Current thread is: Thread[main,5,main] for value value-5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684471576037 for value value-5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684472149299 for value value-5
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-5){eventElapsedNanos=573262, eventElapsedSinceSubscriptionNanos=21912864, eventTimestampEpochMillis=1642106023652})"}
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":6,"message":"| onNext(6)"}
Current thread is: Thread[main,5,main] for value 6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684471912756 for value 6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684472505546 for value 6
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(6){eventElapsedNanos=592790, eventElapsedSinceSubscriptionNanos=22596594, eventTimestampEpochMillis=1642106023653})"}
Current thread is: Thread[main,5,main] for value value-6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684472149299 for value value-6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684472720603 for value value-6
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-6){eventElapsedNanos=571304, eventElapsedSinceSubscriptionNanos=22484168, eventTimestampEpochMillis=1642106023653})"}
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":7,"message":"| onNext(7)"}
Current thread is: Thread[main,5,main] for value 7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684472505546 for value 7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684473196354 for value 7
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(7){eventElapsedNanos=690808, eventElapsedSinceSubscriptionNanos=23287402, eventTimestampEpochMillis=1642106023653})"}
Current thread is: Thread[main,5,main] for value value-7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684472720603 for value value-7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684473426400 for value value-7
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-7){eventElapsedNanos=705797, eventElapsedSinceSubscriptionNanos=23189965, eventTimestampEpochMillis=1642106023654})"}
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":8,"message":"| onNext(8)"}
Current thread is: Thread[main,5,main] for value 8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684473196354 for value 8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684473798044 for value 8
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(8){eventElapsedNanos=601690, eventElapsedSinceSubscriptionNanos=23889092, eventTimestampEpochMillis=1642106023654})"}
Current thread is: Thread[main,5,main] for value value-8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684473426400 for value value-8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684474036903 for value value-8
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-8){eventElapsedNanos=610503, eventElapsedSinceSubscriptionNanos=23800468, eventTimestampEpochMillis=1642106023654})"}
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":9,"message":"| onNext(9)"}
Current thread is: Thread[main,5,main] for value 9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684473798044 for value 9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684474393764 for value 9
{"@timestamp":"2022-01-13T20:33:43.655Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(9){eventElapsedNanos=595720, eventElapsedSinceSubscriptionNanos=24484812, eventTimestampEpochMillis=1642106023655})"}
Current thread is: Thread[main,5,main] for value value-9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684474036903 for value value-9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684474594071 for value value-9
{"@timestamp":"2022-01-13T20:33:43.655Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-9){eventElapsedNanos=557168, eventElapsedSinceSubscriptionNanos=24357636, eventTimestampEpochMillis=1642106023655})"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}
Solution 1:[1]
The use of timed() works great when the flow is not iterative. When the flow is iterative - as in the example, then you need to consider the "mechanics" - that the duration measured is the time between signals.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Alan Moffet |
