'How to commit individual messages on success with KafkaJS
I am trying to find a similar solution to what node-rdkafka does for committing individual messages on success.
In node-rdkafka I was able to do call consumer.commit(message); after message processing succeeded. What is the equivalent in KafkaJS?
I have so far tried to call consumer.commitOffsets(...) inside eachMessage handler, but that didn't seem to commit.
I have code like this:
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'qa-topic',
brokers: [process.env.KAFKA_BOOTSTRAP_SERVER],
ssl: true,
logLevel: logLevel.INFO,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_CONSUMER_SASL_USERNAME,
password: process.env.KAFKA_CONSUMER_SASL_PASSWORD
}
});
const consumer = kafka.consumer({
groupId: process.env.KAFKA_CONSUMER_GROUP_ID
});
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
try {
await processMyMessage(message);
// HOW DO I COMMIT THIS MESSAGE?
// The below doesn't seem to commit
// await consumer.commitOffsets([{ topic: 'my-topic', partition, offset:message.offset }]);
} catch (e) {
// log error, but do not commit message
}
},
})
}
Solution 1:[1]
I figured out how to do it. Can't use eachMessage handler, but instead use eachBatch which allows for more flexibility in control in how messages are committed
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, isRunning, isStale }) => {
const promises = [];
logger.log(`Starting to process ${batch.messages?.length || 0} messages`);
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
promises.push(handleMessage(batch.topic, batch.partition, message, resolveOffset));
}
await Promise.all(promises);
},
});
};
Then inside handleMessage commit only those messages that succeeded
const handleMessage = async (topic, partition, message, resolveOffset) => {
try {
....
//Commit message if successful
resolveOffset(message.offset);
} catch(e) {
...
// Do not commit
}
Solution 2:[2]
As the documentation states: You can call consumer.commitOffsets only after consumer.run. You are trying to call it from within the run method, that is why it's not working for you.
Keep in mind that committing after each message increases the network traffic.
If that is a price you are willing to pay you can configure the auto-commit to take care of that for you by setting the autoCommitThreshold to 1.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | HIT_girl |
| Solution 2 |
