'Error reading delta file, stateful spark structured streaming job with kafka
I m reading data through kafka with spark windowing operation and union the kafka and hive data , make single dataframe. I am trying to find latest data with timestamp through spark window function and row_number then save dataframe in hive table.
val sparkSession: SparkSession =
SparkSession.builder().appName(SPARK_APP_NAME)
.config("hive.metastore.warehouse.dir", SPARK_HIVE_WAREHOUSE_DIR_SUSPICIOUS_USER)
// .config("spark.default.parallelism",70)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.support.concurrency", true)
.config("spark.sql.orc.impl","native")
.config("spark.speculation","false")
query = kakfa_SuspciousUser
.selectExpr(
"nullif(TimeStamp, \"\") as TimeStamp",
"nullif(UserName, \"\") as UserName"
)
.groupBy(window(col("TimeStamp"), "30 minute"), col("UserName"))
.agg(functions.count("UserName").alias("user_UserCount"))
.select("UserName","window.end", "user_UserCount")
.filter(col("user_UserCount").gt(4))
.writeStream
.foreachBatch(function = (batchDF: Dataset[Row], batchId: Long) =>
{
println("drop table if exists CompromiseUserID_transaction")
val hive_target_Table = hiveCache.sparkSession.table("User_temp_table")
println("before hive") /* WRITE FINAL DATASET IN SuspiciousIP HIVE TABLE */
hive_target_Table
.write.mode(SaveMode.Append)
.format("hive")
.saveAsTable("parichaynew.CompromiseUserIDTable")})
.outputMode("update").trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("checkpointLocation",CHECK_POINT)
.start()
query.awaitTermination()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
