'KafkaJs - running consumers in background
I'm writing a service which implements 2 consumers using Node and kafkajs
https://kafka.js.org/docs/consuming
I have 2 async function which have initialized consumers and in theory are pending to read messages. Something like this:
const { Kafka } = require('kafkajs')
// Create the client with the broker list
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
const consumerModule = async (): Promise<void> => {
await consumer.connect();
await consumer.subscribe({ topic: 'topic-A' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
});
}
export default consumerModule;
If I want the consumer to run in the background in the service, should I just import the module file when I initialize the server?
I have a feeling that this will not work, because a call to consumerModule function must be made and because it's async, not sure how to implement it properly.
For example:
import express, { Express } from 'express';
import consumerModuleA from './modules/consumerModuleA';
import consumerModuleB from './modules/consumerModuleB';
async function bootstrap(): Promise<Express> {
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}
Please advise.
Solution 1:[1]
i would suggest the below. Notes:
- you can reuse the implementation of the consumer, by parametrizing the topic/group/etc).
- explicitly call the consumer module to run it.
import express, { Express } from 'express';
import consumerModule from './modules/consumerModule';
consumerModule();
async function bootstrap(): Promise<Express> {
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | OhadR |
