'How to commit individual messages on success with KafkaJS

I am trying to find a similar solution to what node-rdkafka does for committing individual messages on success. In node-rdkafka I was able to do call consumer.commit(message); after message processing succeeded. What is the equivalent in KafkaJS?

I have so far tried to call consumer.commitOffsets(...) inside eachMessage handler, but that didn't seem to commit.

I have code like this:

import { Kafka, logLevel } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'qa-topic',
    brokers: [process.env.KAFKA_BOOTSTRAP_SERVER],
    ssl: true,
    logLevel: logLevel.INFO,
    sasl: {
        mechanism: 'plain', 
        username: process.env.KAFKA_CONSUMER_SASL_USERNAME,
        password: process.env.KAFKA_CONSUMER_SASL_PASSWORD
    }
});

const consumer = kafka.consumer({
    groupId: process.env.KAFKA_CONSUMER_GROUP_ID
});

const run = async () => {

    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

    await consumer.run({
        autoCommit: false,
        eachMessage: async ({ topic, partition, message }) => {
            try {
                await processMyMessage(message);
                
                // HOW DO I COMMIT THIS MESSAGE?
                // The below doesn't seem to commit 
                // await consumer.commitOffsets([{ topic: 'my-topic', partition, offset:message.offset }]);

            } catch (e) {
                // log error, but do not commit message
            }
        },
    })
}



Solution 1:[1]

I figured out how to do it. Can't use eachMessage handler, but instead use eachBatch which allows for more flexibility in control in how messages are committed

const run = async () => {
    await consumer.connect();
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

    await consumer.run({
        eachBatchAutoResolve: false,
        eachBatch: async ({ batch, resolveOffset, isRunning, isStale }) => {
            const promises = [];
            logger.log(`Starting to process ${batch.messages?.length || 0} messages`);
            for (const message of batch.messages) {
                if (!isRunning() || isStale()) break;
                promises.push(handleMessage(batch.topic, batch.partition, message, resolveOffset));
            }
            await Promise.all(promises);
        },
    });
};

Then inside handleMessage commit only those messages that succeeded

const handleMessage = async (topic, partition, message, resolveOffset) => {
    try {
      ....
      
      //Commit message if successful 
      resolveOffset(message.offset);
    } catch(e) {
       ...
       // Do not commit 

}

Solution 2:[2]

As the documentation states: You can call consumer.commitOffsets only after consumer.run. You are trying to call it from within the run method, that is why it's not working for you.

Keep in mind that committing after each message increases the network traffic. If that is a price you are willing to pay you can configure the auto-commit to take care of that for you by setting the autoCommitThreshold to 1.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 HIT_girl
Solution 2