'How can I find large Kafka messages?

The problem

My app, reading message from several topics by mask, crashes with error:

Confluent.Kafka.ConsumeException: Broker: Message size too large

What I've tried

The first problem is that I followed the guidelines for handling large messages: fetch.max.bytes, message.max.bytes and receive.message.max.bytes configurations is set to limits:

ConsumerSettings = new KafkaConsumerSettings(...)
    .With(x => x.ConsumerConfig.FetchMaxBytes   = 2_000_000_000)
    .With(x => x.ConsumerConfig.MessageMaxBytes = 1_000_000_000)
    .With(x => x.ConsumerConfig.ReceiveMessageMaxBytes = 2_000_000_000 + 512);
// `fetch.max.bytes` must be >= `message.max.bytes`
// `message.max.bytes` must be in range 1000..1000000000
// `receive.message.max.bytes` must be >= `fetch.max.bytes` + 512

But the application still cannot process the message. So I decided to skip this message

The second problem is that I can't find the message that is causing the error. I have identified topics that have stopped processing (CURRENT-OFFSET not changed and LAG not equal 0). I checked the CURRENT-OFFSET messages - they are all small in size:

$ kafka-consumer-groups.bat --bootstrap-server ***:9092  --group "Khajiit.GlobalCatalog.KpcCategoryCalculator.V1" --describe

at 2022.02.18:
TOPIC                                                             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG      
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    0          34254397        34876289        621892   
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    1          34713480        34713480        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    2          34717961        34717961        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    0          22851397        70600273        47748876 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    1          5062234         69250288        64188054 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    2          1538710         70360554        68821844 
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    0          58093545        60426064        2332519  
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630        60451357        2323727   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1          445647          192860258       192414611
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2          236             192582179       192581943 (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841        23028014        1209173   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     2          23055736        23055736        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684        50826978        1053294   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454        50854337        1052883   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137         10822241        7733104   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  2          6479987         10830654        4350667  

at 2022.02.21
TOPIC                                                             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    0          34254740        34876289        621549    (34254740 - 34254397 = 343      messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    1          34713480        34713480        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Rng.V3    2          34717961        34717961        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    0          54218443        70676247        16457804  (54218443 - 22851397 = 31367046 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    1          16219754        69328536        53108782  (16219754 - 5062234  = 11157520 messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Edi.V3    2          1616692         70441024        68824332  (1616692  - 1538710  = 77982    messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    0          58096101        60426064        2329963   (58096101 - 58093545 = 2556     messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630        60451357        2323727   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 1          2005847         193217223       191211376 (2005847  - 445647   = 1560200  messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Market.V3 2          374             192938130       192937756 (374      - 236      = 138      messages processed)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841        23028014        1209173   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     2          23055736        23055736        0        
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684        50826978        1053294   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454        50854337        1052883   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137         10878226        7789089   (no progress)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  2          6821193         10886253        4065060   (6821193  - 6479987  = 341206   messages processed)

current offset and the next two message sizes:
TOPIC                                                             PARTITION  CURRENT-OFFSET
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Ubr.V3    2          58127630    (530, 530, 708 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Fg.V3     0          21818841    (545, 545, 545 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     0          49773684    (690, 690, 693 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.X5.V3     1          49801454    (521, 521, 699 bytes)
Khajiit.Content.Aggregation.KpcClassifiedProductChanges.Gepir.V3  1          3089137     (557, 557, 557 bytes)

The question

How can I identify the message that is causing the error?

I'm guessing a solution that will output the largest message sizes for a topic with its partition/offset using command line utilities



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source