'Is there a way to invoke customized ErrorHandler for earlyRecordInterceptor in spring-kafka 2.8
In spring-kafka 2.5.x, 2.6.x (non transactional case), if I add a customized RecordInterceptor , this RecordInterceptor will be invoked by method doInvokeRecodListener(record, iterator) (because there is no earlyRecordInterceptor), inside the method, it invokedErrorHandler which can let me execute some customized errorhanler, spring-kafka 2.6.x source code here. (I list the key source code from spring-kafka below)
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
Iterator<ConsumerRecord<K, V>> iterator) {
Object sample = startMicrometerSample();
try {
invokeOnMessage(record);
successTimer(sample);
recordInterceptAfter(record, null);
}
catch (RuntimeException e) {
...
recordInterceptAfter(record, e);
if (this.errorHandler == null) {
throw e;
}
try {
invokeErrorHandler(record, iterator, e); // here can invoke my customized Errorhandler
....
return null;
}
interceptBeforTx to true (according to here), then the invoke process of my cusomInterceptor will be put in to earlyRecordInterceptor . But unfortunately, the process of earlyRecordInterceptor do not invoke any ErrorHandler, then I cannot invoke my customize Errorhandler, spring-kafka2.8.x source code see here.
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
ConsumerRecord<K, V> next = nextArg;
if (this.earlyRecordInterceptor != null) {
next = this.earlyRecordInterceptor.intercept(next, this.consumer);
if (next == null) {
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
+ ListenerUtils.recordToString(nextArg));
}
}
return next;
}
earlyRecordInterceptor ? If this can be enhanced? (Maybe also invoke errorhandler inside the process of earlyRecordInterceptor method like doInvokeRecordListener did?)
Is there a way to invoke errorhandler for earlyRecordInterceptor?
Thank you for your Help
Solution 1:[1]
The earlyRecordInterceptor is only used the call the "before" method (intercept).
It is simply a reference to the same interceptor as commonRecordInterceptor which is called from recordInterceptAfter.
The early interceptor is never called when transactions are not used, but to revert to the previous behavior for transactions, set interceptBeforTx to false.
private final RecordInterceptor<K, V> earlyRecordInterceptor =
isInterceptBeforeTx() || this.kafkaTxManager == null
? getRecordInterceptor()
: null;
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Gary Russell |
