'Confluent kafka python pause-resume functionality example

Was trying to use the confluent kafka consumer's pause and resume functionality but couldnt find any examples over the internet except the main link.

https://docs.confluent.io/5.0.0/clients/confluent-kafka-python/index.html

Couldn't understand the parameters to be passed to it. Either its list of patitions or topic names or what?



Solution 1:[1]

Pause and resume take a list of TopicPartition

class confluent_kafka.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it.

It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

TopicPartition(topic[, partition][, offset])

Instantiate a TopicPartition object.
Parameters:

  • topic (string) – Topic name
  • partition (int) – Partition id
  • offset (int) – Initial partition offset

Solution 2:[2]

As OneCricketeer mentioned pause() and resume() takes list of TopicPartition and to initialize TopicPartition class you need topic, partition, and offset which you can get from the message object

This is how you can achieve it through Confluent-Kafka-Python:

import time
from confluent_kafka import Consumer, Producer, TopicPartition


conf = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "test-consumer-group",
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}

topics = ['topic1']
consumer = Consumer(conf)
consumer.subscribe(topics)

while True:
    try:
        msg = consumer.poll(1.0)
        if msg is None: 
            print("Waiting for message or event/error in poll()...")
            continue
        if msg.error():
            print("Error: {}".format(msg.error()))
            continue
        else:
            # Call to your processing function and pause the consumer
            consumer.pause([TopicPartition(msg.topic(),msg.partition(),msg.offset())])
            time.sleep(60) # Think of it as processing time
            # Once the processing is done resume the consumer and commit the message
            consumer.resume([TopicPartition(msg.topic(),msg.partition(),msg.offset())])
            consumer.commit()
        
    except Exception as e:
        print(e)

This is just an example and you might want to modify it based on your use case.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 PanicLion