'Controlling Kafka-Offsets using KafkaItemReader and writing to a Database
I'm using the following libraries:
- spring-boot 2.5.8
- spring-kafka 2.7.9
- spring-batch-infrastructure 4.3.4 (including
KafkaItemReaderandJdbcBatchItemWriter)
I defined a job that reads items from a Kafka-topic (with a defined chunk-size) and writes them to a database. Simplified code:
@Bean
@StepScope
KafkaItemReader<Object> myReader(KafkaProperties kafkaProperties) {
return new KafkaItemReaderBuilder<String, Object>()
.consumerProperties(kafkaProperties.buildConsumerProperties())
.partitions(0)
.partitionOffsets(new HashMap<>())
.topic("myTopic")
.build();
}
@Bean
@StepScope
ItemWriter<Object> myWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Object>()
.dataSource(dataSource)
.sql(QUERY_INSERT)
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.build();
}
@Bean
Step myStep(StepBuilderFactory stepBuilderFactory, KafkaItemReader<Object> myReader, ItemWriter<Object> myWriter) {
return stepBuilderFactory.get("myStep")
.<Object, Object>chunk(1)
.reader(myReader)
.writer(myWriter)
.build();
}
When I put 2 items to the Kafka-topic and using chunk-size 1 there are the following method-calls:
KafkaItemReader.update()(initial call)KafkaItemReader.read()(first Kafka-item)JdbcBatchItemWriter.write()(first Kafka-item)KafkaItemReader.update()KafkaItemReader.read()(second Kafka-item)JdbcBatchItemWriter.write()(second Kafka-item)KafkaItemReader.update()KafkaItemReader.close()(closing the reader)
The method KafkaItemReader.update() looks like this:
@Override
public void update(ExecutionContext executionContext) {
if (this.saveState) {
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
}
this.kafkaConsumer.commitSync();
}
After the first read item from the Kafka-topic the call to KafkaItemReader.update() (step 4 in the above list) commits the whole offset (+2, because the job read 2 items) because of this.kafkaConsumer.commitSync();.
So when I receive an error in method JdbcBatchItemWriter.write() in the 2nd chunk, the 1st chunk is written to the database and the 2nd is not (that's correct behaviour).
The problem: When the next job-execution starts I can't re-read the 2nd chunk, because the offsets are already committed. So I want to commit the offsets only for the successful processed (read and write) chunks.
The only solution I have, is to manually control the offsets (i.e. using a database table) for each partition:
- Replace the
new HashMap<>()in.partitionOffsets(new HashMap<>())with the information from the table - Update the offset information in the table after each successful processed chunk
Question: Is there another approach/way to synchronise these two transactions (reading from Kafka, writing to database)?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
