'Writing Spark DataFrame to Kafka as comma separate json object

I am not able to send dataframe as comma separated json object for larger data set .

Working code for smaller data set

    df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value") \
        .write.format("kafka")\
        .option("compression", "gzip")\
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "JsonFormat") \
        .option("kafka.request.timeout.ms", 120000) \
        .option("kafka.linger.ms", 10) \
        .option("compression", "gzip")\
        .option("kafka.retries", 3) \
        .save()
    spark.stop()

output

[{
    "firstname": "James",
    "middlename": "",
    "lastname": "Smith",
    "id": "36636",
    "gender": "M",
    "salary": 3000
}, {
    "firstname": "Michael",
    "middlename": "Rose",
    "lastname": "",
    "id": "40288",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Robert",
    "middlename": "",
    "lastname": "Williams",
    "id": "42114",
    "gender": "M",
    "salary": 4000
}, {
    "firstname": "Maria",
    "middlename": "Anne",
    "lastname": "Jones",
    "id": "39192",
    "gender": "F",
    "salary": 4000
}, {
    "firstname": "Satish",
    "middlename": "Anjaneyapp",
    "lastname": "Brown",
    "id": "",
    "gender": "F",
    "salary": -1
}]

Actual Problem

for larger data set - collect_list(to_json(struct(*))) AS STRING) - trying to collect huge data and sending through kafka . We are getting below error

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 51312082 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Limitation :

I can send only one 1 mb per message through Kafka .

Is there a way , we can break the message upto 1 mb size and send the comma seperated json object .

Tried below configurations , but no luck

kafka.linger.ms

batch.size



Solution 1:[1]

Don't comma separate your JSON objects. Then the records won't be valid JSON. You also shouldn't break into "1MB chunks", because then you'll have incomplete strings being sent to different partitions, and you have no easy way to detemine ordering to put them together in any consumer.

Remove the collect_list call and instead ensure your dataframe has a values string column of valid individual JSON objects as multiple rows. Then the Kafka writer will write each row as a new message

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1