'Is there a way to invoke customized ErrorHandler for earlyRecordInterceptor in spring-kafka 2.8

In spring-kafka 2.5.x, 2.6.x (non transactional case), if I add a customized RecordInterceptor , this RecordInterceptor will be invoked by method doInvokeRecodListener(record, iterator) (because there is no earlyRecordInterceptor), inside the method, it invokedErrorHandler which can let me execute some customized errorhanler, spring-kafka 2.6.x source code here. (I list the key source code from spring-kafka below)

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
  Iterator<ConsumerRecord<K, V>> iterator) {
  Object sample = startMicrometerSample();
  try {
      invokeOnMessage(record);
      successTimer(sample);
      recordInterceptAfter(record, null);
    }
  catch (RuntimeException e) {
    ...
    recordInterceptAfter(record, e);
    if (this.errorHandler == null) {
      throw e;
    }
    try {
    invokeErrorHandler(record, iterator, e); // here can invoke my customized Errorhandler
    ....
    return null;
}
But in spring-kafka 2.8, spring-kafka set the interceptBeforTx to true (according to here), then the invoke process of my cusomInterceptor will be put in to earlyRecordInterceptor . But unfortunately, the process of earlyRecordInterceptor do not invoke any ErrorHandler, then I cannot invoke my customize Errorhandler, spring-kafka2.8.x source code see here.

private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
  ConsumerRecord<K, V> next = nextArg;
  if (this.earlyRecordInterceptor != null) {
    next = this.earlyRecordInterceptor.intercept(next, this.consumer);
    if (next == null) {
      this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
        + ListenerUtils.recordToString(nextArg));
    }
  }
  return next;
}
My question is why spring-kafka not invoke errorhandle for earlyRecordInterceptor ? If this can be enhanced? (Maybe also invoke errorhandler inside the process of earlyRecordInterceptor method like doInvokeRecordListener did?)

Is there a way to invoke errorhandler for earlyRecordInterceptor?

Thank you for your Help



Solution 1:[1]

The earlyRecordInterceptor is only used the call the "before" method (intercept).

It is simply a reference to the same interceptor as commonRecordInterceptor which is called from recordInterceptAfter.

The early interceptor is never called when transactions are not used, but to revert to the previous behavior for transactions, set interceptBeforTx to false.

private final RecordInterceptor<K, V> earlyRecordInterceptor =
        isInterceptBeforeTx() || this.kafkaTxManager == null
                ? getRecordInterceptor()
                : null;

private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gary Russell