'Can I send with websocket value of a variable being updated with a Kafka topic

Problem: I have an object attribute .status that is updated with a Kafka topic, then I send it through websocket. My problem is that each time I ask from the Client side (javascript), then the Server (websockets + asyncio in Python) will start a Kafka consumer from the beginning.

Question: Is it possible to have the Kafka for loop (for msg in consumer:) updating my custom_obj object and send its .status value only when asked for it?

What I've tried

This is what I have so far on the server side:

import asyncio
import websockets
from kafka import KafkaConsumer
import Custom_obj

async def test(websocket):
    consumer = KafkaConsumer(
    'kafka-topic', 
    bootstrap_servers=['kafka.server.com:1234'],
    auto_offset_reset='earliest', #Must start from the beginning to build the object correctly
    enable_auto_commit=True,
    )
    custom_obj = Custom_obj()
    for msg in consumer:
        msg_dec = msg.value.decode()
        custom_obj.update(msg_dec)
        await websocket.send(custom_obj.status) 

async def main():
    async with websockets.serve(test, "localhost", 1234):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

Client side (javascript in Vue component) code:

created() {
  const ws = new WebSocket('ws://localhost:1234');
  ws.onopen = function(e) {
    ws.send('Got here!')
    this.connectionStatus = 'Connected.'
  }
  ws.onerror = function(e) {
    ws.close()
  }
  ws.onclose = function(e) {
    this.connectionStatus = 'Disconnected.'
  }
  ws.onmessage = (e) => {
    console.log(1)
  }


Solution 1:[1]

If you want to track progress in the Kafka topic, you'll need to use a group_id parameter in the constructor

You also might want to look at aiokafka for extra async support

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer