'Starting a new thread and returning the main thread Spring webflux methods subscribeOn

I have a Spring Webflux application in which I want to spawn a new Thread for a time consuming operation and as soon as the new thread is spawned return the main thread so that client is not kept waiting

Please note the time consuming method is not a WebClient call but a service layer function call to download files from a content storage. I have defined a CallableTask class as below

public class DocumentProcessorCallableTask<T> {

    public Mono<T> execute(Callable<T> task) {
        return Mono.defer(
            () -> Mono.fromCallable(task)
                      .doOnError(throwable ->
                          Mono.error(new 
        ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR))));
    }
}

And the method in which I want to spawn a new thread is as below. The method named downloadAndUploadSaveLandsDocuments is the one which makes a call to the content storage to get documents

private Mono<MarsTxnResponse> subscribeOnScheduler(MarsTxnResponse rsp, Long clientId) {
    return new DocumentProcessorCallableTask<Mono<MarsTxnResponse>>()
        .execute(() -> downloadAndUploadSaveLandsDocuments(rsp, clientId))
        .subscribeOn(Schedulers.boundedElastic())
        .thenReturn(rsp);

The problem is that a new thread is created but the webflux code inside downloadAndUploadSaveLandsDocuments is not executed.

It gets executed when I change the above code to below BUT the main thread waits until the newly spawned thread completes which I dont want. Does subscribeOn and publishOn work only from the context of a Webclient call?

return new DocumentProcessorCallableTask<Mono<MarsTxnResponse>>()
        .execute(() -> downloadAndUploadSaveLandsDocuments(rsp, clientId))
        .flatMap(marsTxnResponseMono -> marsTxnResponseMono)
        .subscribeOn(Schedulers.boundedElastic());

The method which calls the content storage is as below.It does not enter Flux.fromIterable in the first case but does enter it in the second case.

public Mono<MarsTxnResponse> downloadAndUploadSaveLandsDocuments(
      MarsTxnResponse rsp, 
      Long clientId) {
    return Flux.fromIterable(rsp.getExpandedWebLtLandList())
                .flatMap(webLtLandFromRequest -> {
                      return downloadDocuments(rsp, webLtLandFromRequest, clientId);
                }).collectList()
                .flatMap(objects -> {
                    return documentAsyncUploadService.uploadFilesIfExists(rsp);
                });
}

I am using spring-boot 2.4.13 and spring-boot-starter-webflux version 2.4.13



Solution 1:[1]

Finally the code which worked

    return Mono.just(rsp)
        .flatMap(
            marsTxnResponse -> {
              new DocumentProcessorCallableTask<Mono<MarsTxnResponse>>()
                  .execute(() -> downloadAndUploadSaveLandsDocuments(rsp, clientId))
                  .flatMap(marsTxnResponseMono -> marsTxnResponseMono)
                  .subscribeOn(Schedulers.boundedElastic())
                  .subscribe();
              return Mono.just(rsp);
            });

The most important thing to add was subscribe() after subscribeOn(Schedulers.boundedElastic())

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Amol Kshirsagar