'Node.js WebSocket Server: How to handle individual subscriptions from clients

I want to build a WebSocket server (node.js, express, ws) that can handle individual subscriptions for each client.

The ideas is that the client (socket.io) sends a message with a subscription request to the server and the server then sends back the corresponding data stream from a kafka consumer (kafkajs) to the client.

The basic data pipeline is already working: kafka broker -> consumer -> WebSocket client

Unfortunately the server is currently sending all data from all consumers to all ws-clients.

How can I manage multiple clients with individual data subscriptions?

Below I have included a simplified representation of my current ws-server.

async function create_consumer(topic) {
    const consumer = kafka.consumer({ groupId: topic });
    await consumer.connect();
    await consumer.subscribe({ topic: topic, fromBeginning: false });
    return consumer;
};

async function send_consumer_to_client(ws, consumer) {
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            wss.clients.forEach(function each(client) {
                if (client.readyState === WebSocket.OPEN) {
                    client.send(JSON.stringify({
                        "value": message,
                    }));
                }
            });
        },
    });
}

const main = async () => {
    consumer_1 = await create_consumer("topic_1");
    consumer_2 = await create_consumer("topic_2");

    server.listen(PORT, () => console.log(`Lisening on port ${server.address().port}`));

    wss.on("connection", async function connection(ws) {
        console.log("A client has connected!");

        ws.on("message", async function message(message) {
            await send_consumer_to_client(ws, consumer_1);
        });

        ws.on("close", function close() {
            console.log("A client has disconnected");
        });
    });
};

main().catch(console.error);

UPDATE 1

I have figured out a solution to handle individual subscriptions for multiple clients by using:

  • a Map() of clients and topics
  • a Map() of topics and consumers

Each time a client subscribes to a particular topic, the group of clients updates accordingly for the wss.clients.forEach send-event.

I have no idea if this is a scalable solution for many clients and topics, but at least it's working. Does anyone know if this is a legit and scalable approach?

async function run_consumer(consumer) {
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            await send_to_clients(clients, topic, message);
        }
    });
};

async function send_to_clients(clients, topic, message) {
    wss.clients.forEach(function each(client) {
        const client_topic = clients.get(client).topic;
        if (client.readyState === WebSocket.OPEN, client_topic == topic) {
            client.send(message);
        }
    });
};

let clients = new Map();
let consumers = new Map();

const main = async () => {
    server.listen(PORT, HOST, () => console.log(`Lisening on port : ${server.address().port} of address: ${server.address().address}`))

    const consumer_groups = {
        'gid_1': 'topic_1',
        'gid_2': 'topic_2',
    };

    Object.entries(consumer_groups).forEach(async ([groupId, topic]) => {
        const consumer = await create_consumer(groupId, topic);
        consumers.set(topic, consumer);
        await run_consumer(consumer);
    });

    wss.on('connection', async function connection(ws) {
        const id = uuidv4();
        const metadata = { 'id': id };
        clients.set(ws, metadata);

        ws.on('message', async function message(msg) {
            const data = JSON.parse(msg);
            const metadata = clients.get(ws);

            if (data.op == 'subscribe') {
                const topic = data.topic;
                metadata['topic'] = topic;
                clients.set(ws, metadata)
            };
        });

        ws.on('close', function close() {
            clients.delete(ws)
        });
    });
};


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source