'aggregator spring cloud stream with timeout
I want to make an application that receives messages, stores those messages in a list, and later with and schedule releases those messages every x amount of time. I know spring cloud stream has an aggregator that already does this, but I think I need it to be done manually because I need to keep a unique message based upon a key and only replace the old message if it matches a specific condition ( I think of it as a Set aggregator with conditions) what I have tried so far. also in this link https://github.com/chalimbu/AggregatorQuestionStack
Processor.
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Input
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Processor
import org.springframework.scheduling.annotation.Scheduled
@EnableBinding(Processor::class)
class SetAggregatorProcessor(val storageService: StorageService) {
@Input
public fun inputMessage(input: Map<String,Any>){
storageService.messages.add(input)
}
@Output
@Scheduled(fixedDelay = 20000)
public fun produceOutput():List<Map<String,Any>>{
val message= storageService.messages
storageService.messages.clear()
return message;
}
}
Memory storage.
import org.springframework.stereotype.Service
@Service
class StorageService {
public var messages: MutableList<Map<String,Any>> = mutableListOf()
}
This code generates the following error when I start pushing messages. Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.5.8.jar:5.5.8]
The idea is to deploy this app as part of the spring cloud stream (dataflow) platform.
I prefer the declarative approach(over the functional approach), but if somebody knows how to do it with the reactor way, I could settle for it.
Thanks for any help or advice.
Solution 1:[1]
thanks to this example(https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java) I was able to figure something out using flux in case someone else needs it
@Configuration
class SetAggregatorProcessor : Function<Flux<Map<String, Any>>, Flux<MutableList<Map<String, Any>>>> {
override fun apply(data: Flux<Map<String, Any>>):Flux<MutableList<Map<String, Any>>> {
return data.window(Duration.ofSeconds(20)).flatMap { window: Flux<Map<String, Any>> ->
this.aggregateList(window)
}
}
private fun aggregateList(group: Flux<Map<String, Any>>): Mono<MutableList<Map<String, Any>>>? {
return group.reduce(
mutableListOf(),
BiFunction<MutableList<Map<String, Any>>, Map<String, Any>, MutableList<Map<String, Any>>> {
acumulator: MutableList<Map<String, Any>>, element: Map<String, Any> ->
acumulator.add(element)
acumulator
}
)
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Sebastian Zapata |
