'Node.js WebSocket Server: How to handle individual subscriptions from clients
I want to build a WebSocket server (node.js, express, ws) that can handle individual subscriptions for each client.
The ideas is that the client (socket.io) sends a message with a subscription request to the server and the server then sends back the corresponding data stream from a kafka consumer (kafkajs) to the client.
The basic data pipeline is already working: kafka broker -> consumer -> WebSocket client
Unfortunately the server is currently sending all data from all consumers to all ws-clients.
How can I manage multiple clients with individual data subscriptions?
Below I have included a simplified representation of my current ws-server.
async function create_consumer(topic) {
const consumer = kafka.consumer({ groupId: topic });
await consumer.connect();
await consumer.subscribe({ topic: topic, fromBeginning: false });
return consumer;
};
async function send_consumer_to_client(ws, consumer) {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
"value": message,
}));
}
});
},
});
}
const main = async () => {
consumer_1 = await create_consumer("topic_1");
consumer_2 = await create_consumer("topic_2");
server.listen(PORT, () => console.log(`Lisening on port ${server.address().port}`));
wss.on("connection", async function connection(ws) {
console.log("A client has connected!");
ws.on("message", async function message(message) {
await send_consumer_to_client(ws, consumer_1);
});
ws.on("close", function close() {
console.log("A client has disconnected");
});
});
};
main().catch(console.error);
UPDATE 1
I have figured out a solution to handle individual subscriptions for multiple clients by using:
- a Map() of clients and topics
- a Map() of topics and consumers
Each time a client subscribes to a particular topic, the group of clients updates accordingly for the wss.clients.forEach send-event.
I have no idea if this is a scalable solution for many clients and topics, but at least it's working. Does anyone know if this is a legit and scalable approach?
async function run_consumer(consumer) {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await send_to_clients(clients, topic, message);
}
});
};
async function send_to_clients(clients, topic, message) {
wss.clients.forEach(function each(client) {
const client_topic = clients.get(client).topic;
if (client.readyState === WebSocket.OPEN, client_topic == topic) {
client.send(message);
}
});
};
let clients = new Map();
let consumers = new Map();
const main = async () => {
server.listen(PORT, HOST, () => console.log(`Lisening on port : ${server.address().port} of address: ${server.address().address}`))
const consumer_groups = {
'gid_1': 'topic_1',
'gid_2': 'topic_2',
};
Object.entries(consumer_groups).forEach(async ([groupId, topic]) => {
const consumer = await create_consumer(groupId, topic);
consumers.set(topic, consumer);
await run_consumer(consumer);
});
wss.on('connection', async function connection(ws) {
const id = uuidv4();
const metadata = { 'id': id };
clients.set(ws, metadata);
ws.on('message', async function message(msg) {
const data = JSON.parse(msg);
const metadata = clients.get(ws);
if (data.op == 'subscribe') {
const topic = data.topic;
metadata['topic'] = topic;
clients.set(ws, metadata)
};
});
ws.on('close', function close() {
clients.delete(ws)
});
});
};
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
