'Pyspark's df.writeStream generates no output

I'm trying to store the tweets from my kafka cluster into Elastic Search. Initially, I set the output format to be 'org.elasticsearch.spark.sql'. But , it created no index.

I tried to change the format to 'console' to check the working of the streaming . But , it doesn't print out anything to the console either.

I am guessing this is a problem with my streaming dataframes . But , I can't seem to find out what exactly is the issue .

This is my full code for the Consumer(Spark Streaming):

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'

from pyspark import SparkContext,SparkConf
#    Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def evaluate_sentiment(avg):
    try:
        if avg < 0:
            return 'Negative'
        elif avg > 0:
            return 'Positive'
        else:
            return 'Neutral'
    except TypeError:
        return 'Neutral'
    
eval_udf = udf(evaluate_sentiment,StringType())

def start_stream(df):
    df.writeStream.format('console').start()


conf = SparkConf().setAppName('twitter_analysis')
spark = SparkSession.builder.appName('twitter_analysis').getOrCreate()
conf.set("es.index.auto.create", "true")

schema = StructType([StructField("date", TimestampType(), True),
                    StructField("user", StringType(), True),
                    StructField("text", StringType(), True),
                    StructField("reply_count", IntegerType(), True),
                    StructField("retweet_count", IntegerType(), True),
                    StructField("favorite_count", IntegerType(), True),
                    StructField("sentiment_score", DecimalType(), True)])

kafkaStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.0.10:9092").option("subscribe", "twitter_analysis")\
.option('failOnDataLoss',False).load()

parsed_df = kafkaStream.select(from_json(col('value').cast('string'),schema).alias('parsed_value')) \
            .withColumn('timestamp', lit(current_timestamp()))

mdf = parsed_df.select('parsed_value.*', 'timestamp')


evaluated_df = mdf.withColumn('status',eval_udf('sentiment_score'))\
               .withColumn('date',to_date(col('timestamp')))

start_stream(evaluated_df)

What could be causing this problem ? Has it got to do anything with the schema I have defined ?

An example of the JSON data that is sent from the Kafka cluster to spark streaming :

{"date": "2020-11-07 21:02:39", "user": "TalhianeM", "text": "RT @amin_goat: Non, des probl\u00e8mes de vote dans une d\u00e9mocratie occidentale ?\n\nOn m\u2019avait assur\u00e9 que cela n\u2019arrivait qu\u2019en Afrique pourtant.", "reply_count": 0, "retweet_count": 0, "favorite_count": 0, "sentiment_score": 0.0}

Could someone please help me resolve this problem ? I tried multiple methods but nothing seems to work in getting the data streams sent to Elastic Search.

UPDATE : I resolved it . There seemed to be a problem with the host .



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source