'covert Spark rdd[row] to dataframe
I have trouble to transform json to dataframe. I am trying to use spark for a project to synchronize table to data lake(hudi) from a CDC(canal) listening mysql binlog. Here I received json about row change and add some fields for it. this json steam include multiple schema. each schemahave different colums and may add new column in the future.so I build GenericRowWithSchema for each json and pass individual schema for each row.
Now, I need to tranform rdd[row] to dataframe to write to hudi How I can trans it?
object code{
def main(args: Array[String]): Unit = {
val sss = SparkSession.builder().appName("SparkHudi").getOrCreate()
//val sc = SparkContext.getOrCreate
val sc = sss.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
//ssc.sparkContext.setLogLevel("INFO");
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka.test.com:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "group-88",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> Boolean.box(true)
)
val topics = Array("test")
val kafkaDirectStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//Cache your RDD before you perform any heavyweight operations.
kafkaDirectStream.start()
val saveRdd = kafkaDirectStream.map(x => {
//receive json from kafka
val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
jsonObject
}).filter(json =>{
/*some json field operation*/
val keySets = dataJson.keySet()
val dataArray:ArrayBuffer[AnyRef] = ArrayBuffer[AnyRef]()
val fieldArray:ArrayBuffer[StructField] = ArrayBuffer[StructField]()
keySets.forEach(dataKey=>{
fieldArray.append(
StructField(dataKey,SqlTypeConverter.toSparkType(sqlTypeJson.getIntValue(dataKey))))
dataArray.append(dataJson.get(dataKey));
})
val schema = StructType(fieldArray)
val row = new GenericRowWithSchema(dataArray.toArray, schema).asInstanceOf[Row]
row
})
saveRdd.foreachRDD ( rdd => {
// Get the offset ranges in the RDD
//println(rdd.map(x => x.toJSONString()).toDebugString());
sss.implicits
rdd.collect().foreach(x=>{
println(x.json)
println(x.schema.sql)
})
})
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
