'KafkaJs - running consumers in background

I'm writing a service which implements 2 consumers using Node and kafkajs https://kafka.js.org/docs/consuming

I have 2 async function which have initialized consumers and in theory are pending to read messages. Something like this:

const { Kafka } = require('kafkajs')

// Create the client with the broker list
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });

const consumerModule = async (): Promise<void> => {

 await consumer.connect();
 await consumer.subscribe({ topic: 'topic-A' });

 await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
 });
}

export default consumerModule;

If I want the consumer to run in the background in the service, should I just import the module file when I initialize the server?

I have a feeling that this will not work, because a call to consumerModule function must be made and because it's async, not sure how to implement it properly.

For example:

import express, { Express } from 'express';
import consumerModuleA from './modules/consumerModuleA';
import consumerModuleB from './modules/consumerModuleB';

async function bootstrap(): Promise<Express> {

    const app = express();

    app.get('/health', (_req, res) => {
        res.send('ok');
    });

    return app;
}

Please advise.



Solution 1:[1]

i would suggest the below. Notes:

  1. you can reuse the implementation of the consumer, by parametrizing the topic/group/etc).
  2. explicitly call the consumer module to run it.
import express, { Express } from 'express';
import consumerModule from './modules/consumerModule';

consumerModule();

async function bootstrap(): Promise<Express> {

    const app = express();

    app.get('/health', (_req, res) => {
        res.send('ok');
    });

    return app;
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OhadR