'Why is Kafka Streams using the same state store for two unrelated tasks?
I'm using Kafka Streams to process data computer game (I'm exporting the in-game map).
- Kafka Streams 3.1.0
- Kotlin 1.6.21
I used kafka-streams-viz to visualize the topology, and I saw something that looks wrong. While these two tasks should be in the same topology, there is no reason for them to be grouped into the same sub-topology.
server-map-data.tiles.reduce is a task that I create, the result of which sent to a KTable-KTable foreign key join.
And server-map-data.join-tiles-with-prototypes-subscription-response-resolver is generated by Kafka Streams after this KTable-KTable FK join. The data in the result of the join is different to the input data, so it shouldn't be sharing state.
// ...
.groupByKey(
groupedAs(
"server-map-data.tiles.group",
kxsBinary.serde<ServerMapChunkId>(),
kxsBinary.serde<ServerMapChunkTiles<TileProtoHashCode>>(),
)
)
.reduce(
"server-map-data.tiles.reduce",
materializedAs(
"server-map-data.tiles.reduce.store",
kxsBinary.serde<ServerMapChunkId>(),
kxsBinary.serde<ServerMapChunkTiles<TileProtoHashCode>>(),
)
) { chunkTiles, other -> chunkTiles + other }
// ...
val colourisedChunkTable: KTable<ServerMapChunkId, ServerMapChunkTiles<ColourHex>> =
chunkedTilesTable
.join(
other = tileProtoColourDict,
tableJoined = tableJoined("server-map-data.join-tiles-with-prototypes"),
materialized = materializedAs(
// Stores.inMemoryKeyValueStore("server-map-data.join-tiles-with-prototypes.store"),
"server-map-data.join-tiles-with-prototypes.store",
kxsBinary.serde<ServerMapChunkId>(),
kxsBinary.serde<ServerMapChunkTiles<ColourHex>>(),
),
foreignKeyExtractor = { chunkTiles: ServerMapChunkTiles<TileProtoHashCode> -> chunkTiles.chunkId.serverId }
) { tiles: ServerMapChunkTiles<TileProtoHashCode>, colourDict: TileColourDict ->
//...
println("Set tile colours for chunk ${tiles.chunkId}")
ServerMapChunkTiles(tiles.chunkId, tileColours)
}
Full topology
https://i.stack.imgur.com/Yjz8I.jpg
Topologies:
Sub-topology: 0
Source: consume.MapChunkUpdate.input (topics: [kafkatorio.packet.map-chunk])
--> consume.MapChunkUpdate.filter-is-instance
Processor: consume.MapChunkUpdate.filter-is-instance (stores: [])
--> consume.MapChunkUpdate.map-values
<-- consume.MapChunkUpdate.input
Processor: consume.MapChunkUpdate.map-values (stores: [])
--> KSTREAM-FILTER-0000000013
<-- consume.MapChunkUpdate.filter-is-instance
Processor: KSTREAM-FILTER-0000000013 (stores: [])
--> map-chunk-update-packets.convert-to-map-tiles
<-- consume.MapChunkUpdate.map-values
Processor: map-chunk-update-packets.convert-to-map-tiles (stores: [])
--> map-chunk-update-packets.filter-out.no-tiles
<-- KSTREAM-FILTER-0000000013
Processor: map-chunk-update-packets.filter-out.no-tiles (stores: [])
--> server-map-data.tiles.flatMapByChunk
<-- map-chunk-update-packets.convert-to-map-tiles
Processor: server-map-data.tiles.flatMapByChunk (stores: [])
--> server-map-data.tiles.filter-not-empty
<-- map-chunk-update-packets.filter-out.no-tiles
Processor: server-map-data.tiles.filter-not-empty (stores: [])
--> server-map-data.tiles.group-repartition-filter
<-- server-map-data.tiles.flatMapByChunk
Processor: server-map-data.tiles.group-repartition-filter (stores: [])
--> server-map-data.tiles.group-repartition-sink
<-- server-map-data.tiles.filter-not-empty
Sink: server-map-data.tiles.group-repartition-sink (topic: server-map-data.tiles.group-repartition)
<-- server-map-data.tiles.group-repartition-filter
Sub-topology: 1
Source: consume.PrototypesUpdate.input (topics: [kafkatorio.packet.prototypes])
--> consume.PrototypesUpdate.filter-is-instance
Processor: consume.PrototypesUpdate.filter-is-instance (stores: [])
--> consume.PrototypesUpdate.map-values
<-- consume.PrototypesUpdate.input
Processor: consume.PrototypesUpdate.map-values (stores: [])
--> tileProtoColourDictionary.map-values
<-- consume.PrototypesUpdate.filter-is-instance
Processor: tileProtoColourDictionary.map-values (stores: [])
--> tileProtoColourDictionary.filterMapTileProtos
<-- consume.PrototypesUpdate.map-values
Processor: tileProtoColourDictionary.filterMapTileProtos (stores: [])
--> KSTREAM-PEEK-0000000008
<-- tileProtoColourDictionary.map-values
Processor: KSTREAM-PEEK-0000000008 (stores: [])
--> tileProtoColourDictionary.pre-table-repartition-filter
<-- tileProtoColourDictionary.filterMapTileProtos
Processor: tileProtoColourDictionary.pre-table-repartition-filter (stores: [])
--> tileProtoColourDictionary.pre-table-repartition-sink
<-- KSTREAM-PEEK-0000000008
Sink: tileProtoColourDictionary.pre-table-repartition-sink (topic: tileProtoColourDictionary.pre-table-repartition)
<-- tileProtoColourDictionary.pre-table-repartition-filter
Sub-topology: 2
Source: server-map-data.join-tiles-with-prototypes-subscription-registration-source (topics: [server-map-data.join-tiles-with-prototypes-subscription-registration-topic])
--> server-map-data.join-tiles-with-prototypes-subscription-receive
Source: tileProtoColourDictionary.pre-table-repartition-source (topics: [tileProtoColourDictionary.pre-table-repartition])
--> tileProtoColourDictionary.create-table
Processor: server-map-data.join-tiles-with-prototypes-subscription-receive (stores: [server-map-data.join-tiles-with-prototypes-subscription-store])
--> server-map-data.join-tiles-with-prototypes-subscription-join-foreign
<-- server-map-data.join-tiles-with-prototypes-subscription-registration-source
Processor: tileProtoColourDictionary.create-table (stores: [server-map-data.tile-prototypes.store])
--> server-map-data.join-tiles-with-prototypes-foreign-join-subscription
<-- tileProtoColourDictionary.pre-table-repartition-source
Processor: server-map-data.join-tiles-with-prototypes-foreign-join-subscription (stores: [server-map-data.join-tiles-with-prototypes-subscription-store])
--> server-map-data.join-tiles-with-prototypes-subscription-response-sink
<-- tileProtoColourDictionary.create-table
Processor: server-map-data.join-tiles-with-prototypes-subscription-join-foreign (stores: [server-map-data.tile-prototypes.store])
--> server-map-data.join-tiles-with-prototypes-subscription-response-sink
<-- server-map-data.join-tiles-with-prototypes-subscription-receive
Sink: server-map-data.join-tiles-with-prototypes-subscription-response-sink (topic: server-map-data.join-tiles-with-prototypes-subscription-response-topic)
<-- server-map-data.join-tiles-with-prototypes-subscription-join-foreign, server-map-data.join-tiles-with-prototypes-foreign-join-subscription
Sub-topology: 3
Source: server-map-data.join-tiles-with-prototypes-subscription-response-source (topics: [server-map-data.join-tiles-with-prototypes-subscription-response-topic])
--> server-map-data.join-tiles-with-prototypes-subscription-response-resolver
Processor: server-map-data.join-tiles-with-prototypes-subscription-response-resolver (stores: [server-map-data.tiles.reduce.store])
--> server-map-data.join-tiles-with-prototypes-result
<-- server-map-data.join-tiles-with-prototypes-subscription-response-source
Processor: server-map-data.join-tiles-with-prototypes-result (stores: [server-map-data.join-tiles-with-prototypes.store])
--> grouped-map-chunk-tiles.output.to-stream
<-- server-map-data.join-tiles-with-prototypes-subscription-response-resolver
Processor: grouped-map-chunk-tiles.output.to-stream (stores: [])
--> grouped-map-chunk-tiles.output.filter-tiles-not-empty
<-- server-map-data.join-tiles-with-prototypes-result
Processor: grouped-map-chunk-tiles.output.filter-tiles-not-empty (stores: [])
--> grouped-map-chunk-tiles.output.map-not-null
<-- grouped-map-chunk-tiles.output.to-stream
Source: server-map-data.tiles.group-repartition-source (topics: [server-map-data.tiles.group-repartition])
--> server-map-data.tiles.reduce
Processor: grouped-map-chunk-tiles.output.map-not-null (stores: [])
--> grouped-map-chunk-tiles.output.print-group-result
<-- grouped-map-chunk-tiles.output.filter-tiles-not-empty
Processor: server-map-data.tiles.reduce (stores: [server-map-data.tiles.reduce.store])
--> server-map-data.join-tiles-with-prototypes-subscription-registration-processor
<-- server-map-data.tiles.group-repartition-source
Processor: grouped-map-chunk-tiles.output.print-group-result (stores: [])
--> grouped-map-chunk-tiles.output.grouped-map-chunks
<-- grouped-map-chunk-tiles.output.map-not-null
Processor: server-map-data.join-tiles-with-prototypes-subscription-registration-processor (stores: [])
--> server-map-data.join-tiles-with-prototypes-subscription-registration-sink
<-- server-map-data.tiles.reduce
Sink: grouped-map-chunk-tiles.output.grouped-map-chunks (topic: kafkatorio.state.map-chunks.grouped)
<-- grouped-map-chunk-tiles.output.print-group-result
Sink: server-map-data.join-tiles-with-prototypes-subscription-registration-sink (topic: server-map-data.join-tiles-with-prototypes-subscription-registration-topic)
<-- server-map-data.join-tiles-with-prototypes-subscription-registration-processor
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

