'Error reading delta file, stateful spark structured streaming job with kafka

I m reading data through kafka with spark windowing operation and union the kafka and hive data , make single dataframe. I am trying to find latest data with timestamp through spark window function and row_number then save dataframe in hive table.

val sparkSession: SparkSession =
SparkSession.builder().appName(SPARK_APP_NAME)

.config("hive.metastore.warehouse.dir", SPARK_HIVE_WAREHOUSE_DIR_SUSPICIOUS_USER)
// .config("spark.default.parallelism",70)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.support.concurrency", true)
.config("spark.sql.orc.impl","native")
.config("spark.speculation","false")

query = kakfa_SuspciousUser
.selectExpr(
"nullif(TimeStamp, \"\") as TimeStamp",
"nullif(UserName, \"\") as UserName"
)
.groupBy(window(col("TimeStamp"), "30 minute"), col("UserName"))
.agg(functions.count("UserName").alias("user_UserCount"))
.select("UserName","window.end", "user_UserCount")
.filter(col("user_UserCount").gt(4))
.writeStream
.foreachBatch(function = (batchDF: Dataset[Row], batchId: Long) =>
{ 
println("drop table if exists CompromiseUserID_transaction") 

val hive_target_Table = hiveCache.sparkSession.table("User_temp_table") 
println("before hive") /* WRITE FINAL DATASET IN SuspiciousIP HIVE TABLE */ 
 hive_target_Table
.write.mode(SaveMode.Append)  
.format("hive")
.saveAsTable("parichaynew.CompromiseUserIDTable")})
.outputMode("update").trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("checkpointLocation",CHECK_POINT)
.start()
query.awaitTermination()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source