'Problem integrating kafka and spark streaming no messages received in spark streaming
My spark streaming context is successfully subscribed to my kafka topic where my tweets are streamed using my twitter producer.But no messages is being streamed from topic in my spark streaming! Here is my code
def main(args: Array[String]){
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9093",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest", //earliest/latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sparkConf = new SparkConf().setAppName("StreamTweets").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = List("topic_three")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
stream.map(record => (record.key, record.value))
stream.print()
stream.foreachRDD { rdd =>
rdd.foreach { record =>
val value = record.value()
val tweet = scala.util.parsing.json.JSON.parseFull(value)
val map:Map[String,Any] = tweet.get.asInstanceOf[Map[String, Any]]
println(map.get("text"))
}
}
ssc.start()
ssc.awaitTermination()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
