'Why is Kafka Streams using the same state store for two unrelated tasks?

I'm using Kafka Streams to process data computer game (I'm exporting the in-game map).

  • Kafka Streams 3.1.0
  • Kotlin 1.6.21

I used kafka-streams-viz to visualize the topology, and I saw something that looks wrong. While these two tasks should be in the same topology, there is no reason for them to be grouped into the same sub-topology.

enter image description here

server-map-data.tiles.reduce is a task that I create, the result of which sent to a KTable-KTable foreign key join.

And server-map-data.join-tiles-with-prototypes-subscription-response-resolver is generated by Kafka Streams after this KTable-KTable FK join. The data in the result of the join is different to the input data, so it shouldn't be sharing state.

// ...
.groupByKey(
  groupedAs(
    "server-map-data.tiles.group",
    kxsBinary.serde<ServerMapChunkId>(),
    kxsBinary.serde<ServerMapChunkTiles<TileProtoHashCode>>(),
  )
)
.reduce(
  "server-map-data.tiles.reduce",
  materializedAs(
    "server-map-data.tiles.reduce.store",
    kxsBinary.serde<ServerMapChunkId>(),
    kxsBinary.serde<ServerMapChunkTiles<TileProtoHashCode>>(),
  )
) { chunkTiles, other -> chunkTiles + other }

// ...

val colourisedChunkTable: KTable<ServerMapChunkId, ServerMapChunkTiles<ColourHex>> =
  chunkedTilesTable
    .join(
      other = tileProtoColourDict,
      tableJoined = tableJoined("server-map-data.join-tiles-with-prototypes"),
      materialized = materializedAs(
//          Stores.inMemoryKeyValueStore("server-map-data.join-tiles-with-prototypes.store"),
        "server-map-data.join-tiles-with-prototypes.store",
        kxsBinary.serde<ServerMapChunkId>(),
        kxsBinary.serde<ServerMapChunkTiles<ColourHex>>(),
      ),
      foreignKeyExtractor = { chunkTiles: ServerMapChunkTiles<TileProtoHashCode> -> chunkTiles.chunkId.serverId }
    ) { tiles: ServerMapChunkTiles<TileProtoHashCode>, colourDict: TileColourDict ->

      //...

      println("Set tile colours for chunk ${tiles.chunkId}")

      ServerMapChunkTiles(tiles.chunkId, tileColours)
    }

Full topology

https://i.stack.imgur.com/Yjz8I.jpg

Topologies:
   Sub-topology: 0
    Source: consume.MapChunkUpdate.input (topics: [kafkatorio.packet.map-chunk])
      --> consume.MapChunkUpdate.filter-is-instance
    Processor: consume.MapChunkUpdate.filter-is-instance (stores: [])
      --> consume.MapChunkUpdate.map-values
      <-- consume.MapChunkUpdate.input
    Processor: consume.MapChunkUpdate.map-values (stores: [])
      --> KSTREAM-FILTER-0000000013
      <-- consume.MapChunkUpdate.filter-is-instance
    Processor: KSTREAM-FILTER-0000000013 (stores: [])
      --> map-chunk-update-packets.convert-to-map-tiles
      <-- consume.MapChunkUpdate.map-values
    Processor: map-chunk-update-packets.convert-to-map-tiles (stores: [])
      --> map-chunk-update-packets.filter-out.no-tiles
      <-- KSTREAM-FILTER-0000000013
    Processor: map-chunk-update-packets.filter-out.no-tiles (stores: [])
      --> server-map-data.tiles.flatMapByChunk
      <-- map-chunk-update-packets.convert-to-map-tiles
    Processor: server-map-data.tiles.flatMapByChunk (stores: [])
      --> server-map-data.tiles.filter-not-empty
      <-- map-chunk-update-packets.filter-out.no-tiles
    Processor: server-map-data.tiles.filter-not-empty (stores: [])
      --> server-map-data.tiles.group-repartition-filter
      <-- server-map-data.tiles.flatMapByChunk
    Processor: server-map-data.tiles.group-repartition-filter (stores: [])
      --> server-map-data.tiles.group-repartition-sink
      <-- server-map-data.tiles.filter-not-empty
    Sink: server-map-data.tiles.group-repartition-sink (topic: server-map-data.tiles.group-repartition)
      <-- server-map-data.tiles.group-repartition-filter

  Sub-topology: 1
    Source: consume.PrototypesUpdate.input (topics: [kafkatorio.packet.prototypes])
      --> consume.PrototypesUpdate.filter-is-instance
    Processor: consume.PrototypesUpdate.filter-is-instance (stores: [])
      --> consume.PrototypesUpdate.map-values
      <-- consume.PrototypesUpdate.input
    Processor: consume.PrototypesUpdate.map-values (stores: [])
      --> tileProtoColourDictionary.map-values
      <-- consume.PrototypesUpdate.filter-is-instance
    Processor: tileProtoColourDictionary.map-values (stores: [])
      --> tileProtoColourDictionary.filterMapTileProtos
      <-- consume.PrototypesUpdate.map-values
    Processor: tileProtoColourDictionary.filterMapTileProtos (stores: [])
      --> KSTREAM-PEEK-0000000008
      <-- tileProtoColourDictionary.map-values
    Processor: KSTREAM-PEEK-0000000008 (stores: [])
      --> tileProtoColourDictionary.pre-table-repartition-filter
      <-- tileProtoColourDictionary.filterMapTileProtos
    Processor: tileProtoColourDictionary.pre-table-repartition-filter (stores: [])
      --> tileProtoColourDictionary.pre-table-repartition-sink
      <-- KSTREAM-PEEK-0000000008
    Sink: tileProtoColourDictionary.pre-table-repartition-sink (topic: tileProtoColourDictionary.pre-table-repartition)
      <-- tileProtoColourDictionary.pre-table-repartition-filter

  Sub-topology: 2
    Source: server-map-data.join-tiles-with-prototypes-subscription-registration-source (topics: [server-map-data.join-tiles-with-prototypes-subscription-registration-topic])
      --> server-map-data.join-tiles-with-prototypes-subscription-receive
    Source: tileProtoColourDictionary.pre-table-repartition-source (topics: [tileProtoColourDictionary.pre-table-repartition])
      --> tileProtoColourDictionary.create-table
    Processor: server-map-data.join-tiles-with-prototypes-subscription-receive (stores: [server-map-data.join-tiles-with-prototypes-subscription-store])
      --> server-map-data.join-tiles-with-prototypes-subscription-join-foreign
      <-- server-map-data.join-tiles-with-prototypes-subscription-registration-source
    Processor: tileProtoColourDictionary.create-table (stores: [server-map-data.tile-prototypes.store])
      --> server-map-data.join-tiles-with-prototypes-foreign-join-subscription
      <-- tileProtoColourDictionary.pre-table-repartition-source
    Processor: server-map-data.join-tiles-with-prototypes-foreign-join-subscription (stores: [server-map-data.join-tiles-with-prototypes-subscription-store])
      --> server-map-data.join-tiles-with-prototypes-subscription-response-sink
      <-- tileProtoColourDictionary.create-table
    Processor: server-map-data.join-tiles-with-prototypes-subscription-join-foreign (stores: [server-map-data.tile-prototypes.store])
      --> server-map-data.join-tiles-with-prototypes-subscription-response-sink
      <-- server-map-data.join-tiles-with-prototypes-subscription-receive
    Sink: server-map-data.join-tiles-with-prototypes-subscription-response-sink (topic: server-map-data.join-tiles-with-prototypes-subscription-response-topic)
      <-- server-map-data.join-tiles-with-prototypes-subscription-join-foreign, server-map-data.join-tiles-with-prototypes-foreign-join-subscription

  Sub-topology: 3
    Source: server-map-data.join-tiles-with-prototypes-subscription-response-source (topics: [server-map-data.join-tiles-with-prototypes-subscription-response-topic])
      --> server-map-data.join-tiles-with-prototypes-subscription-response-resolver
    Processor: server-map-data.join-tiles-with-prototypes-subscription-response-resolver (stores: [server-map-data.tiles.reduce.store])
      --> server-map-data.join-tiles-with-prototypes-result
      <-- server-map-data.join-tiles-with-prototypes-subscription-response-source
    Processor: server-map-data.join-tiles-with-prototypes-result (stores: [server-map-data.join-tiles-with-prototypes.store])
      --> grouped-map-chunk-tiles.output.to-stream
      <-- server-map-data.join-tiles-with-prototypes-subscription-response-resolver
    Processor: grouped-map-chunk-tiles.output.to-stream (stores: [])
      --> grouped-map-chunk-tiles.output.filter-tiles-not-empty
      <-- server-map-data.join-tiles-with-prototypes-result
    Processor: grouped-map-chunk-tiles.output.filter-tiles-not-empty (stores: [])
      --> grouped-map-chunk-tiles.output.map-not-null
      <-- grouped-map-chunk-tiles.output.to-stream
    Source: server-map-data.tiles.group-repartition-source (topics: [server-map-data.tiles.group-repartition])
      --> server-map-data.tiles.reduce
    Processor: grouped-map-chunk-tiles.output.map-not-null (stores: [])
      --> grouped-map-chunk-tiles.output.print-group-result
      <-- grouped-map-chunk-tiles.output.filter-tiles-not-empty
    Processor: server-map-data.tiles.reduce (stores: [server-map-data.tiles.reduce.store])
      --> server-map-data.join-tiles-with-prototypes-subscription-registration-processor
      <-- server-map-data.tiles.group-repartition-source
    Processor: grouped-map-chunk-tiles.output.print-group-result (stores: [])
      --> grouped-map-chunk-tiles.output.grouped-map-chunks
      <-- grouped-map-chunk-tiles.output.map-not-null
    Processor: server-map-data.join-tiles-with-prototypes-subscription-registration-processor (stores: [])
      --> server-map-data.join-tiles-with-prototypes-subscription-registration-sink
      <-- server-map-data.tiles.reduce
    Sink: grouped-map-chunk-tiles.output.grouped-map-chunks (topic: kafkatorio.state.map-chunks.grouped)
      <-- grouped-map-chunk-tiles.output.print-group-result
    Sink: server-map-data.join-tiles-with-prototypes-subscription-registration-sink (topic: server-map-data.join-tiles-with-prototypes-subscription-registration-topic)
      <-- server-map-data.join-tiles-with-prototypes-subscription-registration-processor


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source