'How to implement a list of DB update queries in one call with SpringBoot Webflux + R2dbc application

The goal of my springBoot webflux r2dbc application is Controller accepts a Request including a list of DB UPDATE or INSERT details, and Response a result summary back.

I can write a ReactiveCrudRepository based repository to implement each DB operation. But I don't know how to write the Service to group the executions of the list of DB operations and compose a result summary response.

I am new to java reactive programing. Thanks for any suggestions and help.

Chen



Solution 1:[1]

I get the hint from here: https://www.vinsguru.com/spring-webflux-aggregation/ . Ideas are :

  1. From request to create 3 Monos
  • Mono<List> monoEndDateSet -- DB Row ids of update operation;
  • Mono<List> monoCreateList -- DB Row ids of new inserted;
  • Mono monoRespFilled -- partly fill some known fields;
  1. use Mono.zip aggregate the 3 monos, map and aggregate the Tuple3 to Mono to return.

Below are key part of codes:

    public Mono<ChangeSupplyResponse> ChangeSupplies(ChangeSupplyRequest csr){
        
        ChangeSupplyResponse resp = ChangeSupplyResponse.builder().build();
        resp.setEventType(csr.getEventType());
        resp.setSupplyOperationId(csr.getSupplyOperationId());
        resp.setTeamMemberId(csr.getTeamMemberId());
        resp.setRequestTimeStamp(csr.getTimestamp());
        resp.setProcessStart(OffsetDateTime.now());
        resp.setUserId(csr.getUserId());

        Mono<List<Long>> monoEndDateSet = getEndDateIdList(csr);
        Mono<List<Long>> monoCreateList = getNewSupplyEntityList(csr);
        Mono<ChangeSupplyResponse> monoRespFilled = Mono.just(resp);

        return Mono.zip(monoRespFilled, monoEndDateSet, monoCreateList).map(this::combine).as(operator::transactional);
    }


    private ChangeSupplyResponse combine(Tuple3<ChangeSupplyResponse, List<Long>, List<Long>> tuple){
        ChangeSupplyResponse resp = tuple.getT1().toBuilder().build();
        List<Long> endDateIds = tuple.getT2();
        resp.setEndDatedDemandStreamSupplyIds(endDateIds);

        List<Long> newIds = tuple.getT3();
        resp.setNewCreatedDemandStreamSupplyIds(newIds);
        
        resp.setSuccess(true);
        Duration span = Duration.between(resp.getProcessStart(), OffsetDateTime.now());
        resp.setProcessDurationMillis(span.toMillis());
        return resp;
    }

   private Mono<List<Long>> getNewSupplyEntityList(ChangeSupplyRequest csr) {
        Flux<DemandStreamSupplyEntity> fluxNewCreated = Flux.empty();
        for (SrmOperation so : csr.getOperations()) {
            if (so.getType() == SrmOperationType.createSupply) {
                DemandStreamSupplyEntity e = buildEntity(so, csr);
                fluxNewCreated = fluxNewCreated.mergeWith(this.demandStreamSupplyRepository.save(e));
            }
        }

        return fluxNewCreated.map(e -> e.getDemandStreamSupplyId()).collectList();
    }

...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1