'Compute the frequency of words using Spark Streaming
I have a dummy server that is sending text over a socket to my Spark program. I would like the program to then be able to process the text to determine the frequency of a word relative to the amount of words in a Window. Below is the code I have tried
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class TwitterStream{
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf()
.setAppName("Twitter")
.setMaster("local[4]")
.set("spark.executor.memory", "1g");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaDStream<String> window = tweets.window(Durations.seconds(6), Durations.seconds(2));
JavaDStream<String> words = window.flatMap(x -> Arrays.stream(x.split(" ")).iterator());
JavaDStream<String> window = words.window(Durations.seconds(6), Durations.seconds(2));
JavaPairDStream<String, Long> pairs = window.mapToPair(s -> new Tuple2<>(s, 1L));
JavaPairDStream<String, Long> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
JavaDStream<Tuple2> test = wordCounts.transform(f -> f.map(x -> new Tuple2<>(x._1, x._2/f.count())));
test.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
}
}
I initially tried to use the transform method on the WordCounts object but it resulted in the following error
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 6, localhost, executor driver): org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
So then I thought I would just try and use the count method for the windows object and divide it by the number of occurrences of a word but this resulted in an error as count returns a JavaDStream and not a long.
wordCounts.mapToPair(x -> {return new Tuple2<>(x._1, x._2/wordCounts.count());});
The program should be able to print out something like (test,0.2232), (car,0.1342), ...
Thanks in advance.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
