'Pivot stream data in spark

I am reading data from Kafka topic and I want to pivot the data, I am using the below code in spark shell

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val data = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "*******:9092")  
  .option("subscribe", "PARAMTABLE")
  .option("startingOffsets", "latest")
  .load()

val schema = new StructType().add("ENTITY_ID",StringType).add("PARAM_NAME”, StringType).add("VALUE”, StringType)
val df1 = data.selectExpr("CAST(value AS STRING)")
val dataDF = df1.select(from_json(col("value"), schema).as("data").select("data.*")
def forEachFunc(dataDF, batch_id): DataFrame = {
    dataDF.groupBy(“ENTITY_ID").pivot(“PARAM_NAME").agg(first(“VALUE")) 
    .withColumn("ProcessedTime", current_timestamp()) 
    .write.format("memory").mode(“append").save(“pivotedDataFrame.parquet”)
  }

data.writeStream.foreachBatch(forEachFunc).format("console").option("truncate",false).outputMode("append").start().awaitTermination()

But I am getting error, Someone please suggest a correct way of achieving this

Sample of my Kafka topic message is below,

{"PARAM_INSTANCE_ID":128748494,"ENTITY_ID":107437678,"PARAM_NAME":"Survey Required","VALUE":"Unchecked"}

spark-stream-error-image



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source