'Kafka producer client: Will all callbacks be executed before return of flush()

I want to check the status of sending records to Kafka. I need to make sure that all records were successfully stored in Kafka. I was thinking about using the callback mechanism, e.g. creating a callback class like

public class MyCallback implememts Callback {
    private AtomicReference<Exception> exceptionRef;
    public MyCallback(){
        exceptionRef=new AtomicReference<>(null);
    }
    @Override
    public void onCompletion(final RecordMetadata metadata,final Exception exception){
        if (exception!=null){
            exceptionRef.set(exception);
        }
    }
    public void check()
        throws Exception
    {
        Exception exception=exceptionRef.get();
        if (exception!=null) throw exception;
    }
}

and then have a main program like

try{
    Producer<Object,Object> producer=new KafkaProducer<>(props);
    MyCallback callback=new MyCallback();
    for (ProducerRecord<Object,Object> rec:myRecords){
        producer.send(rec,callback);
    }
    producer.flush();
    callback.check();
}
catch(Exception e){
   handle(e);
}

My question: Can I be sure that the callback has been called for all sent records when flush() returns?

I should add that the setting acks=all is used.



Solution 1:[1]

I found that the answer is "yes". According to the javadoc:

The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true).

The question is then "Has the callback been called before the future completes?" To answer that, you need to go into the internals code, i.e. the package org.apache.kafka.clients.producer.internals. The future itself is implemented in the class FutureRecordMetadata, and the method is:

    @Override
    public boolean isDone() {
        if (nextRecordMetadata != null)
            return nextRecordMetadata.isDone();
        return this.result.completed();
    }

So we learn that the futures are linked, and the actual completion is delegated to the result, which is a ProduceFuture. If you look in the class ProducerBatch, you will see that a ProduceFuture is on the batch level, in other words, a future on the record level is done, when the future of its batch is completed. Now looking at what sets batch level future to completed, we find that this method in ProducerBatch does that:

    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);

        // execute callbacks
        for (Thunk thunk : thunks) {
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();
    }

The thunks are the record level futures, so it is clear that the callbacks are called prior to setting the future to done.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Karsten Spang