'Kafka producer client: Will all callbacks be executed before return of flush()
I want to check the status of sending records to Kafka. I need to make sure that all records were successfully stored in Kafka. I was thinking about using the callback mechanism, e.g. creating a callback class like
public class MyCallback implememts Callback {
private AtomicReference<Exception> exceptionRef;
public MyCallback(){
exceptionRef=new AtomicReference<>(null);
}
@Override
public void onCompletion(final RecordMetadata metadata,final Exception exception){
if (exception!=null){
exceptionRef.set(exception);
}
}
public void check()
throws Exception
{
Exception exception=exceptionRef.get();
if (exception!=null) throw exception;
}
}
and then have a main program like
try{
Producer<Object,Object> producer=new KafkaProducer<>(props);
MyCallback callback=new MyCallback();
for (ProducerRecord<Object,Object> rec:myRecords){
producer.send(rec,callback);
}
producer.flush();
callback.check();
}
catch(Exception e){
handle(e);
}
My question: Can I be sure that the callback has been called for all sent records when flush() returns?
I should add that the setting acks=all is used.
Solution 1:[1]
I found that the answer is "yes". According to the javadoc:
The post-condition of
flush()is that any previously sent record will have completed (e.g.Future.isDone() == true).
The question is then "Has the callback been called before the future completes?"
To answer that, you need to go into the internals code, i.e. the package
org.apache.kafka.clients.producer.internals. The future itself is implemented in the class FutureRecordMetadata, and the method is:
@Override
public boolean isDone() {
if (nextRecordMetadata != null)
return nextRecordMetadata.isDone();
return this.result.completed();
}
So we learn that the futures are linked, and the actual completion is delegated to the result, which is a ProduceFuture.
If you look in the class ProducerBatch, you will see that a ProduceFuture is on the batch level, in other words, a future on the record level is done, when the future of its batch is completed. Now looking at what sets batch level future to completed, we find that this method in ProducerBatch does that:
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
// execute callbacks
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
The thunks are the record level futures, so it is clear that the callbacks are called prior to setting the future to done.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Karsten Spang |
