'Mapping JSON to DTO from Akka Stream

Using the below code I run a flow named requestFlow which maps JSON to a Java DTO.

The class StreamManager gathers the sources and runs the flows from method runStreams. Class StreamLauncher starts StreamManager.runStreams

Class TestStream tests requestFlow.

StreamFlowsTest tests the flows contained in StreamFlows

I’ve used the Lombok builder annotation in StreamFlows :

@Builder
@ToString
public class StreamFlows {

I’ve not seen a Lombok builder used in the Akka docs but I think it is a good pattern to follow. Is there any reason not to use it in this case?

Overall is this a good design for mapping JSON to DTO using Akka Streams, is there a design pattern I could follow?

src :

package qs.q4;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;

    private String someOtherField;

}


package qs.q4;

import akka.NotUsed;
import akka.stream.javadsl.Flow;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Builder;
import lombok.ToString;

@Builder
@ToString
public class StreamFlows {

    final ObjectMapper mapper = new ObjectMapper();

    public final Flow<String, RequestDto, NotUsed> requestFlow = Flow.of(String.class)
            .map(input -> mapper.readValue(input, RequestDto.class))
            .log("error");
}

package qs.q4;

import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Sink;

import java.util.concurrent.CompletionStage;

public class StreamLauncher {

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
        final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        StreamManager.runStreams(printSink , actorSystem);

    }

}

package qs.q4;

import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.stream.Graph;
import akka.stream.SinkShape;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Source;
import java.util.Collections;

public class StreamManager {

    private static final Source<String, NotUsed> getDataSource1(){
        final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
        final String json2 = "{\"datePurchased\":\"2022-02-03 21:32:017\"},{\"unknownField\":\"test\"}";

        final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
        final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
        final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);

        return Source.combine(source1, source2, Collections.singletonList(source3), Merge::create);
    }

    private static final Source<String, NotUsed> getDataSource2(){
        final String json1 = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";
        final String json2 = "{\"datePurchased\":\"2022-02-03 21:32:017\"},{\"unknownField\":\"test\"}";

        final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
        final Source<String, NotUsed> source2 = Source.repeat(json2).take(5);
        final Source<String, NotUsed> source3 = Source.repeat(json2).take(13);

        return Source.combine(source1, source2, Collections.singletonList(source3), Merge::create);
    }

    public static void runStreams(final Graph<SinkShape<RequestDto>, ?> sink, final ActorSystem actorSystem){
        getDataSource1().via(StreamFlows.builder().build().requestFlow).to(sink).run(actorSystem);
        getDataSource2().via(StreamFlows.builder().build().requestFlow).to(sink).run(actorSystem);
    }
}


package qs.q4;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;

public class TestStream {

    static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("StreamTestKitDocTest");
    }

    @AfterClass
    public static void tear_down() {
        TestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void test_stream_flow() throws ExecutionException, InterruptedException, TimeoutException {

        final int STREAM_SIZE = 3;
        final TestKit probe = new TestKit(system);

        final Date datePurchasedDate = new Date(1643923937000L);
        final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
        final Source<String, NotUsed> source = Source.repeat(json1).take(STREAM_SIZE);

        final Flow<String, RequestDto, NotUsed> flowUnderTest = StreamFlows.builder().build().requestFlow;
        final CompletionStage<List<RequestDto>> result = source.via(flowUnderTest).runWith(Sink.seq(), system);
        final List<RequestDto> requestDtoList = result.toCompletableFuture().get(3, TimeUnit.SECONDS);

        assertEquals(STREAM_SIZE, requestDtoList.size());
        assertEquals(datePurchasedDate, requestDtoList.get(0).getDatePurchased());
        assertEquals(datePurchasedDate, requestDtoList.get(1).getDatePurchased());
        assertEquals(datePurchasedDate, requestDtoList.get(2).getDatePurchased());
    }

}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source