'Are 'distinct' operations on Structured Streaming Datasets not supported?
From the spark structured streaming docs, unsupported operations contain that
Distinct operations on streaming Datasets are not supported.
However, there is a distinct() method in the API and I can also call distinct() after streaming DateSet.
public final class JavaStructuredNetworkWordDistinct {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","C://hadoop" );
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordDistinct")
.config("spark.master", "local[*]")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
spark.conf().set("spark.sql.shuffle.partitions",4);
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
Dataset<Tuple2<Timestamp, String>> dropStream = spark
.readStream()
.option("sep", ",")
.schema(userSchema) // Specify schema of the csv files
.csv("D:\\deduplication")
.selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
.as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));
StreamingQuery outerQuery = execDeduplicationDistinct(spark,dropStream);
outerQuery.awaitTermination();
}
private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {
Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
// Start running the query that prints the running counts to the console
StreamingQuery query = dropDuplicatesStream.writeStream()
.outputMode("append")
.format("console")
.start();
return query;
}
}
And there only one file under the folder D:\\deduplication , content are
event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
finally , the result shows that
-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
| event_time| id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
| null| word|
+-------------------+-----+
so ? what is wrong when I understand distinct?
And, I also run socket streaming. Code is
object StructuredNetworkWordCountDistinct {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val spark: SparkSession = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.master("local[*]")
.config("spark.sql.shuffle.partitions",4)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.distinct()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.trigger(Trigger.ProcessingTime("1 second")) // only change in query
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
and start netcat with nc -L -p 9999.
Firstly, input v1 and the all output batchs results are
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
| v1|
+-----+
and secondly, input v1 again, and new output batch result is
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+
And spark seems to remembered first v1 (batch) to distinct second batch result.
Solution 1:[1]
"...Some Dataframe operations cannot be supported with Spark Structured Streaming, e.g. distinct, sorting, etc. as for those transformations Spark would need to store the entire data in memory. ..."
Badly explained.
This type of query with complete mode does not work with distinct like operation:
val mappedDF = jsonEdits.groupBy($"user").agg(countDistinct($"userURL").as("cnt"))
It returns:
org.apache.spark.sql.AnalysisException: Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider using approx_count_distinct() instead.
If using append mode, it needs a watermark also.
Your query is a simple
append; there adistinctWithout aggregation. This is a trivial matter for Spark at the micro batch level, it has no state issue or agg's to consider, and just processes the current micro batch, as you observed. Doc's are a little poor in this regard.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
