'covert Spark rdd[row] to dataframe

I have trouble to transform json to dataframe. I am trying to use spark for a project to synchronize table to data lake(hudi) from a CDC(canal) listening mysql binlog. Here I received json about row change and add some fields for it. this json steam include multiple schema. each schemahave different colums and may add new column in the future.so I build GenericRowWithSchema for each json and pass individual schema for each row.

Now, I need to tranform rdd[row] to dataframe to write to hudi How I can trans it?

object code{

def main(args: Array[String]): Unit = {
val sss = SparkSession.builder().appName("SparkHudi").getOrCreate()
//val sc = SparkContext.getOrCreate
val sc = sss.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
//ssc.sparkContext.setLogLevel("INFO");

import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka.test.com:9092",
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.GROUP_ID_CONFIG -> "group-88",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> Boolean.box(true)
)

val topics = Array("test")

val kafkaDirectStream  = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

//Cache your RDD before you perform any heavyweight operations.

kafkaDirectStream.start()


val saveRdd = kafkaDirectStream.map(x => {
  //receive json from kafka
  val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
  jsonObject
}).filter(json =>{


  /*some json field operation*/

  val keySets = dataJson.keySet()

  val dataArray:ArrayBuffer[AnyRef] = ArrayBuffer[AnyRef]()
  val fieldArray:ArrayBuffer[StructField] = ArrayBuffer[StructField]()


  keySets.forEach(dataKey=>{
    
fieldArray.append(
 StructField(dataKey,SqlTypeConverter.toSparkType(sqlTypeJson.getIntValue(dataKey))))

    dataArray.append(dataJson.get(dataKey));
  })

  val schema = StructType(fieldArray)
  val row = new GenericRowWithSchema(dataArray.toArray, schema).asInstanceOf[Row]

  row
})




saveRdd.foreachRDD ( rdd => {
  // Get the offset ranges in the RDD
  //println(rdd.map(x => x.toJSONString()).toDebugString());

  sss.implicits




  rdd.collect().foreach(x=>{
    println(x.json)
    println(x.schema.sql)
  })

})


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source