'value foreachBatch is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
I am trying to write data into mysql using spark. For this I am using foreachBatch but this is not working. I am doing this on spark shell. Below is the complete code.
spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar --jars mysql-connector-java-5.1.36-bin.jar
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{window, column, col, desc}
val staticDataFrame=spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/ajeet20028137/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
val streamingDataFrame=spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",1).format("csv").option("header","true").load("/user/ajeet20028137/by-day/*.csv")
val purchaseByCustomerPerHour=streamingDataFrame.selectExpr("customerid","unitprice * quantity as total_cost","invoiceDate").groupBy(col("customerid"), window(col("invoiceDate"),"1 day")).sum("total_cost")
val query = purchaseByCustomerPerHour.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF:DataFrame,batchId:Long)=>{batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://cxln2.c.thelab-240901.internal/retail_db").option("dbtable","purchasebycustomerperday").option("user","sqoopuser").option("password","password").save()}).start()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
