'How to check the content of 2 or more values that can be contained in the resulting stream of elements - Flux<T> (spring WebFlux)

I have a method that validates containing some elements in Stream.

  • Task

For example, there is a sequence that has a series of numbers (the numbers are not repeated), and each number is larger than the other :

1, 20, 35, 39, 45, 43

... It is necessary to check whether there is a specified range in this stream, for example 35 ...49. If there is no such range, then you need to throw an Exception.

But since this is asynchronous processing, the usual methods do not work here. After all, we process elements in the stream and do not know what the next element will be.

The elements of this sequence need to be folded, the addition should be done gradually (as soon as the initial range of elements is found).

During the service, you need to check whether there is an endpoint in the generated sequence and when the entire flow of elements is received, but this point is not, then ask for an Exception, since the upper limit of the specified range is not received

Also, do not start calculations until the starting point of the specified range is found,

while we cannot block the stream, otherwise we will get the same Exstrong textception.

How can such a check be organized ?

When I work with a regular thread, it looks like this:


   private boolean isRangeValues() {

        List<BigInteger> sequence = Arrays
                .asList(
                        new BigInteger("11"),
                        new BigInteger("15"),
                        new BigInteger("23"),
                        new BigInteger("27"),
                        new BigInteger("30"));

           BigInteger startRange =   new BigInteger("15");
           BigInteger finishRange = new BigInteger("27");

        boolean isStartRangeMember = sequence.contains(startRange);
        boolean isFinishRangeMembe = sequence.contains(finishRange);


        return isStartRangeMember && isFinishRangeMember;
    }

But I have a task to process a stream of elements that are generated at some interval. To get the result, a reactive stack is used in Spring and I get the result in Flux.

Just convert to a list and process, - it will not work, there will be an Exception. After filtering these elements, the stream will continue to be processed.

But if I see an error in data validation at the time of filtering (in this case, there are no elements that are needed), then I will need to request an Exception, which will be processed globally and returned to the client.

    @GetMapping("v1/sequence/{startRange}/{endRange}")
    Mono<BigInteger> getSumSequence(
            @PathVariable BigInteger startRange,
            @PathVariable BigInteger endRange) {

        Flux<BigInteger> sequenceFlux = sequenceGenerated();
        
         validateSequence(sequenceFlux)
      
       return sum(sequenceFlux );
     }

     private Mono<BigInteger> sum (Flux<BigInteger> sequenceFlux ){
      .....
     }

       private void validateSequence(Flux<BigInteger> sequenceFlux){
         ... is wrong
         throw new RuntimeException();
     }
}

I came up with some solution (I published it in this topic).

    public void validateRangeSequence(sequenceDto dto) {

        Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
        BigInteger startRange = dto.getStartRange();
        BigInteger endRange = dto.getEndRange();

        Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
        
        Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);

        if ( !isStartRangeMember.equals(isEndRangeMember) ){
            throw new RuntimeException("error");
        }

But it doesn't work as expected, even the correct results cause an exception.

Update

public void validateRangeSeq(RangeSequenceDto dto) {

        Flux<BigInteger> sequenceFlux = dto.getSequenceFlux();
        BigInteger startRange = dto.getStartRange();
        BigInteger endRange = dto.getEndRange();

        Mono<Boolean> isStartRangeMember = sequenceFlux.hasElement(startRange);
        Mono<Boolean> isEndRangeMember = sequenceFlux.hasElement(endRange);

        sequenceFlux
                .handle((number, sink) -> {
                    if (!isStartRangeMember.equals(isEndRangeMember) ){
                        sink.error(new RangeWrongSequenceExc("It is wrong given range!."));
                    } else {
                        sink.next(number);
                    }
                });
    }

  • Unfortunately , That decision also doesn't work.
        sequenceFlux
                .handle(((bigInteger, synchronousSink) -> {
                    if(!bigInteger.equals(startRange)){
                        synchronousSink.error(new RuntimeException("!!!!!!!!!!! ---- Wrong range!"));
                    } else {
                        synchronousSink.next(bigInteger);
                    }
                }));

It piece of code - It doesn't work. (does not react in any way)

Who thinks what about this ? Should this be done or are there other approaches ?

I am not familiar with Reactive stack in Spring and do not know how to handle such a situation here.

Maybe someone has ideas on how to organize such filtering and do not block the processing of elements in the stream.



Solution 1:[1]

You can try to do it like that

    Flux<Integer> yourStream = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).share();
Flux.zip(
        yourStream.filter(integer -> integer.equals(4)),
        yourStream.filter(integer -> integer.equals(6)),
        (integer, integer2) -> Tuple2.of(integer, integer2))
    .subscribe(System.out::println);

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Ricard Kollcaku