'FluxSink seems to be queing things

For testing purposes, I want to send only one thing at a time, but the thing(s) that FluxSink is sending to the other side do not match the think that I literally just called the FluxSink.next method with. The thing(s) that it is sending over are things that were "nexted" a while ago. Is there any way to prevent FluxSink from doing any kind of queing/batching or to set the queue/batch size to 1, just like I'm setting my batch size to one for my test?



Solution 1:[1]

I'm not clearly understood what you're going to achieve, so it what I'm guessing base on title:

You may use FluxSink.OverflowStrategy but there no strategy to block FluxSink.next, because reactive programming is NOT about blocking programming and if producer in reactive programming is faster that consumer, reactive programming wil take care and will BUFFER|DROP etc. so You have to speed up Your consumer or choose appropriate FluxSink.OverflowStrategy.

Think how You will implement it without reactive programming. If your producer is faster than consumer you probably will queue data from producer or throw an error, because data are too old.

Anyway, probably the best choice in Your case will be Flux.create or Flux.generate Difference Between Flux.create and Flux.generate

Remember: Flux.generate is created to calculate and emit values on demand, so you may put BlockingQueue.poll() inside - it's not what I recommend, but it sth what You probably looking for.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1