'Controlling Kafka-Offsets using KafkaItemReader and writing to a Database

I'm using the following libraries:

  • spring-boot 2.5.8
  • spring-kafka 2.7.9
  • spring-batch-infrastructure 4.3.4 (including KafkaItemReader and JdbcBatchItemWriter)

I defined a job that reads items from a Kafka-topic (with a defined chunk-size) and writes them to a database. Simplified code:

@Bean
@StepScope
KafkaItemReader<Object> myReader(KafkaProperties kafkaProperties) {
    return new KafkaItemReaderBuilder<String, Object>()
            .consumerProperties(kafkaProperties.buildConsumerProperties())
            .partitions(0)
            .partitionOffsets(new HashMap<>())
            .topic("myTopic")
            .build();
}

@Bean
@StepScope
ItemWriter<Object> myWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Object>()
            .dataSource(dataSource)
            .sql(QUERY_INSERT)
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .build();
}

@Bean
Step myStep(StepBuilderFactory stepBuilderFactory, KafkaItemReader<Object> myReader, ItemWriter<Object> myWriter) {
    return stepBuilderFactory.get("myStep")
            .<Object, Object>chunk(1)
            .reader(myReader)
            .writer(myWriter)
            .build();
    }

When I put 2 items to the Kafka-topic and using chunk-size 1 there are the following method-calls:

  1. KafkaItemReader.update() (initial call)
  2. KafkaItemReader.read() (first Kafka-item)
  3. JdbcBatchItemWriter.write() (first Kafka-item)
  4. KafkaItemReader.update()
  5. KafkaItemReader.read() (second Kafka-item)
  6. JdbcBatchItemWriter.write() (second Kafka-item)
  7. KafkaItemReader.update()
  8. KafkaItemReader.close() (closing the reader)

The method KafkaItemReader.update() looks like this:

@Override
public void update(ExecutionContext executionContext) {
    if (this.saveState) {
        executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
    }
    this.kafkaConsumer.commitSync();
}

After the first read item from the Kafka-topic the call to KafkaItemReader.update() (step 4 in the above list) commits the whole offset (+2, because the job read 2 items) because of this.kafkaConsumer.commitSync();. So when I receive an error in method JdbcBatchItemWriter.write() in the 2nd chunk, the 1st chunk is written to the database and the 2nd is not (that's correct behaviour).

The problem: When the next job-execution starts I can't re-read the 2nd chunk, because the offsets are already committed. So I want to commit the offsets only for the successful processed (read and write) chunks.

The only solution I have, is to manually control the offsets (i.e. using a database table) for each partition:

  • Replace the new HashMap<>() in .partitionOffsets(new HashMap<>()) with the information from the table
  • Update the offset information in the table after each successful processed chunk

Question: Is there another approach/way to synchronise these two transactions (reading from Kafka, writing to database)?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source