'Reactor Flux<String> one to many Flux<T>

I currently have a flux which results from some async operations and produces Flux<String>. I would like to then use those strings as params in a legacy callback API that will result in multiple events emitted per input String from the primary flux.

The following code works as expected, but I cant find a way to terminate successfully the second flux without doing seemingly hacky things like keeping counters and such. Is there a more idiomatic way to do that?

    public Flux<Account> getAccounts(UUID userId) {
        var tokens = tokenRepo.findAllActiveByAccountUUID(userId);

          return tokens.flatMap(p -> Flux.create(e -> {
            var r = new AccountsGetRequest().accessToken(p);
            c.accountsGet(r).enqueue(new Callback<>() {
                @Override
                public void onResponse(@NotNull Call<AccountsGetResponse> call,
                                       @NotNull Response<AccountsGetResponse> response) {
                    if (response.isSuccessful() && response.body() != null) {
                        (response.body()).forEach(e::next);
                        e.complete();
                    } else {
                        log.debug(response.toString());
                        e.error(new RuntimeException("getAccounts" + response.code()));
                        e.complete();
                    }
                }

                @Override
                public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
                    e.error(t);
                    e.complete();
                }
            });
        }));
    }


Solution 1:[1]

You can simply call e.complete() after the forEach

I think it would be simpler to use Flux.fromIterable(response.body()) instead of Flux.generate

Note that if c.accountsGet(r) is a blocking call, you should probably use the publishOn operator to switch to another scheduler to prevent blocking the main thread.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1