'Generate Kafka message with Headers using Apache Spark
I have an ETL (spark-scala). After writing in a table, a message with "header" must be sent to Kafka. I couldn't add the header in the message. I have a spark DataFrame with the "key" and the "value". I have tried to incorporate the "header" but when I read the message, this comes with the header field as "NO HEADERS". How could I incorporate the header in the message?
This is an example of what I have already tried:
val df = spark.createDataFrame(spark.sparkContext.parallelize(mySeq), schemaDf)
.withColumn("headers", split(col("Marca"), "@#@|%").cast("array<string>"))
.selectExpr("CAST(Marca AS STRING) AS key", "to_json(struct(*)) AS value", "headers AS headers")
.as[(String, String, Array[String])]
df
.write.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-kafka-notification")
.option("includeHeaders", "true")
.save()
I have also tried with the column "headers" as String and it didn't work either.
The message I get is this one, with "NO HEADERS":
NO_HEADERS peugeot {"Marca":"peugeot","Modelo":"308","Unidades":3,"headers":["peugeot"]}
NO_HEADERS Seat {"Marca":"Seat","Modelo":"Arona","Unidades":4,"headers":["Seat"]}
NO_HEADERS Seat {"Marca":"Seat","Modelo":"Leon","Unidades":10,"headers":["Seat"]}
NO_HEADERS Seat {"Marca":"Seat","Modelo":"Ibiza","Unidades":6,"headers":["Seat"]}
NO_HEADERS Opel {"Marca":"Opel","Modelo":"Corsa","Unidades":6,"headers":["Opel"]}
NO_HEADERS Fiat {"Marca":"Fiat","Modelo":"Punto","Unidades":16,"headers":["Fiat"]}
NO_HEADERS Fiat {"Marca":"Fiat","Modelo":"Panda","Unidades":2,"headers":["Fiat"]}
NO_HEADERS Mercedes {"Marca":"Mercedes","Modelo":"Benz","Unidades":1,"headers":["Mercedes"]}
Thank you, regards,
Solution 1:[1]
The easiest way of add headers is using withColumn as you tried in your example.
Headers must be an array<struct key:string,value:binary>, so make sure your column meet this format. You don't need to add
.option("includeHeaders", "true")
in your df.write, thats used when you are reading from Kafka into a DataFrame with a readStream.
Here is a simple example you can replicate:
selectDf
.withColumn("headers",array(struct(lit("headerKey") as "key", lit("headerValue").cast("binary") as "value")))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output")
.save()
Don't forget to check if your Spark version and your Kafka client are compatible with headers.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | kuro |