'Control rate of individual topic consumption in Kafka Streams 0.9.1.0-cp1?

I am trying to backprocess data in Kafka topics using a Kafka Streams application that involves a join. One of the streams to be joined has much larger volume of data per unit of time in the corresponding topic. I would like to control the consumption from the individual topics so that I get roughly the same event timestamps from each topic in a single consumer.poll(). However, there doesn't appear to be any way to control the behavior of the KafkaConsumer backing the source stream. Is there any way around this? Any insight would be appreciated.



Solution 1:[1]

in the consumer side you can use consume([num_messages=1][, timeout=-1]) function instead of poll.

consume([num_messages=1][, timeout=-1]): Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method. The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None) and errors for each Message in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.

  • num_messages (int) – The maximum number of messages to return (default: 1).
  • timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alireza Safi