'WebFlux async responses from controllers

Hello everyone

My task is to simply make controller that gives me results immediately when they are ready (simple example below)

Imagine:

I want to get the exact number of Strings (for example 1000 Strings that are somehow made for 1 second) (actually I need to get result of func but to simplify the task just Strings)

So when I get some request in my controller I want it to give answers as soon as they are ready (without buffering results) in that way:

What I want is:

1 second

"some string" -> (send response to my frontend)

1 second

"another one" -> (send response to my frontend)

1 second

"third one" -> (send response to my frontend) ....

But what I get is:

1000 seconds

"some string"

.....

"thousand strings"

Here is my code:

@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
        System.out.println("get3 start");
        Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data--" + i;
        }));
        System.out.println("get3 end");
        return result;

    }

Actually in my console I get

"get3 start" and "get3 end" immediately but response only goes after all strings are ready

My actual service for this task is similar (but I merge 2 Flux here) and I get Flux which is formed by interval so I want it to give me results as soon as they appear

public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
        String checkMsg = checkCalculationDto(calculationDto);
        if(checkMsg.equals("Success")){//valid
            Long quantity = Long.parseLong(calculationDto.getQuantity());

            Flux<AnswerCalculationDto> firstFunc =  Flux.interval(interval)//func 1
                    .onBackpressureDrop()
                    .takeWhile((i)-> i < quantity)
                    .map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
                    ;
            Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
                    .onBackpressureDrop()
                    .takeUntil((i)-> i > quantity-2)
                    .map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
                    ;
            return Flux.merge(firstFunc,secondFunc);
        }
        else {//invalid data from client
            return Flux.just(new AnswerCalculationDto("",checkMsg));
        }

    }


Solution 1:[1]

There are several options to stream data from the server using WebFlux:

  • Server-sent events pushing individual events (media type: text/event-stream)
  • Streaming events separated by newlines (media type: application/x-ndjson)

Here is a complete example that exposes both text/event-stream & application/x-ndjson endpoints and returns data in json format. If you need plain text content - use text/event-stream.

@RestController
public class StreamingController {

    @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
    Flux<DataEntry> sse() {
        return stream();
    }

    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    Flux<DataEntry> ndjson() {
        return stream();
    }

    private Flux<DataEntry> stream() {
        return Flux.range(1, 1000)
                .delayElements(Duration.ofSeconds(1))
                .map(i -> new DataEntry(i, Instant.now()));
    }

    @Value
    @Builder
    private static class DataEntry {
        long index;
        Instant timestamp;
    }
}

To test text/event-stream use:

curl -v -H "Accept: text/event-stream" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
< 
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}

data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}

data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}

data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}

To test application/x-ndjson use:

curl -v -H "Accept: application/x-ndjson" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}

The above example will produce 1000 records with 1 seconds interval. You could also generate unbounded streams using something like

private Flux<DataEntry> stream() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new DataEntry(i, Instant.now()));
}

Solution 2:[2]

What I was looking for is Http Streaming, furthermore note that Safari as well as Postman and axios (js lib - I used it in my frontend part) doesn't support http streaming so you cant see your output to appear as soon as one of the results are ready (only all results in 1 response), try it in Chrome.

Also if you struggle with frontend part as I did, try to search for SSE - server-sent-events, for example this: https://turkogluc.com/server-sent-events-with-spring-boot-and-reactjs/

hope this will help

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 lol