'Is there a way to prevent excessive RAM consumption with the Spark configuration using Python Spark and a NER model?

I'm writing a program that uses Spark structured streaming to obtain the entities of tweets that passed from a second program through the socket. However, when applying the Spacy NER the RAM memory (32GB on a EC2 instance) is overload by the NER model and after several batches a Memory error stops the execution.

I've used both htop and Spark UI to try identify what could be the reason behind the excessive consumption but I've only been able to determine it's caused by the NER model.

The SparkSession is only configured to receive the tweets from the socket that connects with the second program that sends the tweets. The DataFrame goes through some processing to obtain other properties of the tweet like its sentiment (which causes no error even with less than 8GB of RAM) and then the NER is applied.

    spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate()
    rawTweets = spark.readStream.format("socket").option("host", "localhost").option("port",9008).load()
    tweets = rawTweets.selectExpr("CAST(value AS STRING)")

    #prior processing of the tweets
    sentDF = other_processing(tweets)

    #obtaining the column that contains the list of entities from a tweet
    nerDF = ner_classification(sentDF)

This is the code of the functions related to obtaining the NER, the "main call" and the UDF function.

nerModel = spacy.load("en_core_web_sm")

#main call, applies the UDF function to every tweet from the "tweet" column
def ner_classification(words):
    ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
    words = words.withColumn("nerlist", ner_list("tweet"))
    return words

#udf function
def obtain_ner_udf(words):
    #if the tweet is empty return None
    if words == "":
        return None
    #else: applying the NER model (Spacy en_core_web_sm)
    entities = nerModel(words)

    #returns a list of the form ['entity1_label1', 'entity2_label2',...]
    return [ word.text + '_' + word.label_ for word in entities.ents ]

And lastly I map each entity with the sentiment from its tweet and obtain the average sentiment of the entity and the number of appearances.

    flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
    flattenedNER.registerTempTable("df")


    querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as count FROM df GROUP BY col"
    finalDF = spark.sql(querySelect)

    query = finalDF.writeStream.foreachBatch(processBatch).outputMode("complete").start()
    query.awaitTermination()

The processBatch functions simply generates a treemap with the entities as the labels, the average sentiment as the color and the count as the size. Changing the output mode to "update" has the same effect.

At first I used the ner-english model from Flair, which is almost twice as heavy as Spacy's en_core_web_sm, but the change only prevents the RAM consumption to take longer, it still collapses. I've also tried increasing the spark.executor.memory and the spark.driver.memory but has had no effect.

In addition, when visualizing the executors in Spark UI there's only one running, the driver executor, but with htop the 8 cores of the instance are being used at 100% most of the time. Without the call to the NER model (returning words.split() on the UDF function for example) there's no problem and the treemap is generated so the processing of the batch and the operations applied to the DataFrame are correct (exploding the array and making the query).

Is there a way to prevent the excessive RAM consumption with the Spark configuration? Why is there only one executor and why does it only collapse when applying the NER model indistinctively of what NER module I use?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source