'KafkaProducer does not send messages in multiprocessing.Process
I got the following concept, which is a simplified example of a real app. The problem is that the producer will not produce messages while working in a multiprocessing.Process process.
import logging
import multiprocessing
import os
from kafka import KafkaProducer
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')
producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
def test_produce():
# producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
for i in range(10):
print('inserting') # prints once
producer.send(KAFKA_NAME, b'test').get() # will not be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()
Solution 1:[1]
The problem lies in process-across object serialization, which causes the producer to be serialized whatever-wrong. The solution is to create local producer instances inside your multiprocessed function
import logging
import multiprocessing
import os
from kafka import KafkaProducer
KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')
producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
def test_produce():
producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
for i in range(10):
print('inserting') # prints 10 times
producer1.send(KAFKA_NAME, b'test').get() #all messages will be commited
producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()
This way everything works as it is supposed to be. By the way, you can use threading.Thread, which will not cause such an issue.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Ivan |
