'How to create python kafka producer with multiple bootstrap servers?
I'm trying to use send messages to a Kafka topic which is supported by multiple brokers. Below is my method, where self.bootstrap_servers = "[b-1.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092, b-2.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092]"
from kafka import KafkaProducer
from time import sleep
def send(self):
try:
producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, api_version=(0, 10, 1))
message = self.prepare_message()
producer.send(self.topic, self.encoded_payload)
sleep(2)
except:
print("Exception sending message to kafka")
when I try to test this I get in the exception block and the line that fails is when I create my producer. How can I publish a Kafka message with this KafkaProducer, which has multiple kafka bootstrap_servers supporting it? Is there another way to do this?
Error:
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1236, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1192, in get_ip_port_afi
host, rest = host_and_port_str[1:].split(']')
ValueError: not enough values to unpack (expected 2, got 1)
Solution 1:[1]
ast.literal_eval(bootstrap_servers) can help.
It takes a stringified list as input, and converts it to a list. For example:
input_str = "['broker1', 'broker2', 'broker3']"
type(input_str)
str
If you do
input_list = ast.literal_eval(input_str)
input_list is now a list
type(input_list)
list
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | pppery |
