'Streaming multiple events of different types using Axon

I am working on building streaming APIs for client/server communication using Axon and ServerSentEvents and not sure if it is possible to stream and identify multiple different events using Axon query update emitter and subscription query.

I am using Axon QueryUpdateEmitter.emit to emit the events from a projection based on different events. Emitter is emitting in projection whereas subscription query is taking place in the REST API that is supposed to stream the server sent events to client.

For example, I want to emit 3 different events for a use case which creates, updates and deletes an entity.
I am wondering if we can emit different types of data from different events but still combine in one stream, i.e. send actual object upon entity create and update in the emitter but, since I don’t have any entity/data to emit in case of delete, I thinking whether to send a simple message for delete?

I also want a way to specify the type of event while emitting so when ServerSentEvent is build from subscription query, I can specify the type/action (for ex, differentiate between create or update event) along with data.

Main idea is to emit different events and add them in one stream despite knowing all events may not return exactly same data (create, update vs. delete) as part of one subscription query and to be able to accurately identify the event and specify in the stream of ServerSentEvents with appropriate event type.

Any ideas on how I can achieve this?

Here's how I am emitting an event upon creation using QueryUpdateEmitter:

    @EventHandler
    public void on(LibraryCreatedEvent event, @Timestamp Instant timestamp) {
        final LibrarySummaryEntity librarySummary = mapper.createdEventToLibrarySummaryEntity(event, timestamp);
        repository.save(librarySummary);
        log.debug("On {}: Saved the first summary of the library named {}", event.getClass().getSimpleName(), event.getName());

        queryUpdateEmitter.emit(
                AllLibrarySummariesQuery.class,
                query -> true,
                librarySummary
        );
        log.debug("emitted library summary: {}", librarySummary.getId());
    }

Since I need to distinguish between create and update so I tried using GenericSubscriptionQueryUpdateMessage.asUpdateMessage upon update event and added some metadata along with it but not sure if that is in the right direction as I am not sure how to retrieve that information during subscription query.

Map<String, String> map = new HashMap();
map.put(“Book Updated”, event.getLibraryId());

queryUpdateEmitter.emit(AllLibrarySummariesQuery.class,query → true,GenericSubscriptionQueryUpdateMessage.asUpdateMessage(librarySummary).withMetaData(map));

Here's how I am creating subscription query:

SubscriptionQueryResult<List<LibrarySummaryEntity>, LibrarySummaryEntity> result = queryGateway.subscriptionQuery(new AllLibrarySummariesQuery(),ResponseTypes.multipleInstancesOf(LibrarySummaryEntity.class),ResponseTypes.instanceOf(LibrarySummaryEntity.class));

And the part where I am building server sent event: (.event is where I want to specify the type of event - create/update/delete and send the applicable data accordingly)

Flux<ServerSentEvent<LibrarySummaryResponseDto>> sseStream = result.initialResult()
.flatMapMany(Flux::fromIterable).map(value -> mapper.libraryEntityToResponseDto(value))
.concatWith((streamingTimeout == -1)? result.updates().map(value -> mapper.libraryEntityToResponseDto(value)): result.updates().take(Duration.ofMinutes(streamingTimeout)).map(value -> mapper.libraryEntityToResponseDto(value)))
.log()
.map(created -> ServerSentEvent.<LibrarySummaryResponseDto>builder()
     .id(created.getId())
     .event("library creation")
     .data(created).build())
.doOnComplete(() ->  {log.info("streaming completed");})
.doFinally(signal -> result.close());


Solution 1:[1]

As long as the object you return matches the expected type when making the subscription query, you should be good!

Note that this means you will have to make a response object that can fit your scenarios. Whether response is something you'd emit as the update (through the QueryUpdateEmitter) or a map operation from where you return the subscription query, is a different question, though.

Ideally, you'd decouple your internal messages from what you send outward, like with SSE. To move to a more specific solution, you could benefit from having a Flux response type. You can simply attach any mapping operations to adjust the responses emitted by the QueryUpdateEmitter to your desired SSE format.

Concluding, the short answer is "yes you can," as long as the emitted response object matches the expected update type when dispatching the subscription query on the QueryGateway.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Steven