'Scala KStreams moving from many filters to split and branch

Assuming that all undefined methods here have valid signatures. I have the following code that works perfectly:

  // Add validation tags to the stream
  val validated: KStream[K, V] = stream.mapValues(x => validate(x))

  // Predicate to separate valid and invalid regions
  val isInvalid: (K, V) => Boolean = (_: K, v: V) => !v.isValid

  // Branch the rejects out of the main stream and into a rejects topic
  val incAndReject: KStream[K, V] => Unit =
    _.mapValues(f1)
      .peek(incMetric1)
      .to("bad-topic")

Now I have the following working snippet:

  validated.peek((_, v) => incMetric2(v))
  val temp = validated.filter(isInvalid)
  incAndReject(temp)
  validated.filter((_, v) => v.isValid)

And I am trying to change it to the, what I think is, equivalent code using split and branch instead of filter, and it doesn't work:

  // Branched names are for the topology, they do not affect topic names (?)
  val rejectsBranch: Branched[K, V] = Branched.withConsumer[K, V](incAndReject, "invalid-region")

  validatedEnrichedStream
    .peek((_, v) => incMetric2(v))
    .split()
    .branch(isInvalid, rejectsBranch)
    .defaultBranch()
    .head
    ._2

Am I doing it right? Documentation about KStreams branched streams is abysmal and only in Java



Solution 1:[1]

The key was on the keys of the map elements, that change slightly from the branch names, so .head._2 worked while .get("name") didn't

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Peter Csala