'Scala KStreams moving from many filters to split and branch
Assuming that all undefined methods here have valid signatures. I have the following code that works perfectly:
// Add validation tags to the stream
val validated: KStream[K, V] = stream.mapValues(x => validate(x))
// Predicate to separate valid and invalid regions
val isInvalid: (K, V) => Boolean = (_: K, v: V) => !v.isValid
// Branch the rejects out of the main stream and into a rejects topic
val incAndReject: KStream[K, V] => Unit =
_.mapValues(f1)
.peek(incMetric1)
.to("bad-topic")
Now I have the following working snippet:
validated.peek((_, v) => incMetric2(v))
val temp = validated.filter(isInvalid)
incAndReject(temp)
validated.filter((_, v) => v.isValid)
And I am trying to change it to the, what I think is, equivalent code using split and branch instead of filter, and it doesn't work:
// Branched names are for the topology, they do not affect topic names (?)
val rejectsBranch: Branched[K, V] = Branched.withConsumer[K, V](incAndReject, "invalid-region")
validatedEnrichedStream
.peek((_, v) => incMetric2(v))
.split()
.branch(isInvalid, rejectsBranch)
.defaultBranch()
.head
._2
Am I doing it right? Documentation about KStreams branched streams is abysmal and only in Java
Solution 1:[1]
The key was on the keys of the map elements, that change slightly from the branch names, so .head._2 worked while .get("name") didn't
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Peter Csala |
