'Scala KafkaUtils.createDirectStream for calculating the average of every key in DStream
I should write the Scala code for calculating the average of values of every key in a DStream. The stream of key-value pairs are generated in this format:
This is a university project and I should complete a given code. The code is: package sparkstreaming
import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
import scala.util.Random
object KafkaSpark {
def main(args: Array[String]) {
// make a connection to Kafka and read (key, value) pairs from it
<FILL IN>
val kafkaConf = <FILL IN>
val messages = KafkaUtils.createDirectStream.<FILL IN>
<FILL IN>
// measure the average value for each key in a stateful manner
def mappingFunc(key: String, value: Option[Double], state: State[Double]): (String, Double) = {
<FILL IN>
}
val stateDstream = pairs.mapWithState(<FILL IN>)
ssc.start()
ssc.awaitTermination()
}
}
My biggest problem is that I don't know the syntax for KafkaUtils.createDirectStream for Scala. It is available for Java but not Scala which is quite different. The only provided code for it in Scala is:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
Which I really can't understand the syntax.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

