'pyspark-reddit getting java.lang.StackOverflowError

While running df.head() or df.show() the below error is generated. Attached code below - Is the problem in the structype? i wanted to allow all columns to allow nullable values on construction. When i try to initiate spark object again, connection 111 doesn't go away. Why is initiation of a new sparkcontext get or create object disallowed by previous one's error? Sorry if its a very noob question, I couldn't find a remedy going through docs. All help much appreciated.

import json
import os
import pprint
from pprint import pprint
from re import M
from black import List
import praw
import pyspark
import yaml
from dataclasses import dataclass, field
from typing import Dict, Tuple, List
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import SparkSession
import os


@dataclass(frozen=True)
class RawAttributesToExtract:

    post: list = field(
        default_factory=lambda: [
            "title",
            "upvote_ratio",
            "score",
            "author",
            "mod_note",
            "over_18",
            "distinguished",
            "removal_reason",
            "report_reasons",
            "num_comments",
            "created_utc",
            "stickied",
            "url",
        ]
    )

    comments: list = field(default_factory=lambda: ["list1", "list2", "list3"])

    users: list = field(default_factory=lambda: ["list1", "list2", "list3"])


class UkrainePosts:

    """returns Ukraine posts data
    as a list of dicts
    """

    def __init__(self):

        self._subreddits_search_string_dict = {
            "news": ["ukraine"],
            "worldnews": ["WorldNews Live Thread"],
            "volunteersforukraine": ["ukraine"],
            "politics": ["ukraine"],
            "ukraine": [" "],
        }

        self._post_data = []

    def get_subreddit_post_data(
        self, subreddit_name: str, search_strings: str, limit: int
    ):
        """Extracts specfic subreddit's posts,  appends into _data list

        Args:
            subreddit_name (str): r/???
            search_strings (str): list of sub strings extracted post can have
            limit (int): number of posts to search in hot
        """
        subreddit = r.subreddit(subreddit_name)

        for post in subreddit.hot(limit=limit):

            if any(search_str in post.title for search_str in search_strings):

                datum = {
                    attribute: getattr(post, attribute)
                    for attribute in RawAttributesToExtract().post
                }

                self._post_data.append(datum)

    def get_data_for_all_subreddits(self):

        for sub in self._subreddits_search_string_dict:
            self.get_subreddit_post_data(
                sub, self._subreddits_search_string_dict[sub], limit=1000
            )

        return self._post_data


spark = SparkSession.builder.appName("Ukraine").getOrCreate()

r = praw.Reddit(
    client_id="",
    client_secret="",
    user_agent="yourredditusername..",
)
posts_data = UkrainePosts().get_data_for_all_subreddits()

schema = StructType(
    [StructField(col, StringType(), True) for col in posts_data[0].keys()]
)

posts_df = spark.createDataFrame([Row(**x) for x in posts_data], schema=schema)

posts_df.head()
22/03/29 11:56:04 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21) 1]
java.lang.StackOverflowError
    at java.util.AbstractMap.toString(AbstractMap.java:547)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    
22/03/29 11:56:04 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 21) (15.1.3.85 executor driver): java.lang.StackOverflowError
    
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractMap.toString(AbstractMap.java:559)

22/03/29 11:56:04 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py", line 504, in send_command
    "Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:

~/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

<class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused'))

During handling of the above exception, another exception occurred:

Py4JError                                 Traceback (most recent call last)
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
    692         with SCCallSiteSync(self._sc) as css:
--> 693             sock_info = self._jdf.collectToPython()
    694         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1309         return_value = get_return_value(
-> 1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
--> 113             converted = convert_exception(e.java_exception)
    114             if not isinstance(converted, UnknownException):

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in convert_exception(e)
     97     if c is not None and (
---> 98             c.toString().startswith('org.apache.spark.api.python.PythonException: ')
     99             # To make sure this only catches Python UDFs.

~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1309         return_value = get_return_value(
-> 1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:

~/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    335                 "An error occurred while calling {0}{1}{2}".
--> 336                 format(target_id, ".", name))
    337     else:

Py4JError: An error occurred while calling o67.toString

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
/tmp/ipykernel_19736/2778771738.py in <module>
----> 1 posts_df.head(1)

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in head(self, n)
   1603             rs = self.head(1)
   1604             return rs[0] if rs else None
-> 1605         return self.take(n)
   1606 
   1607     def first(self):

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in take(self, num)
    742         [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    743         """
--> 744         return self.limit(num).collect()
    745 
    746     def tail(self, num):

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
    691         """
    692         with SCCallSiteSync(self._sc) as css:
--> 693             sock_info = self._jdf.collectToPython()
    694         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    695 

~/shivamenv/venv/lib/python3.7/site-packages/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
     76         SCCallSiteSync._spark_stack_depth -= 1
     77         if SCCallSiteSync._spark_stack_depth == 0:
---> 78             self._context._jsc.setCallSite(None)

~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1306             proto.END_COMMAND_PART
   1307 
-> 1308         answer = self.gateway_client.send_command(command)
   1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)

~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in send_command(self, command, retry, binary)
   1034          if `binary` is `True`.
   1035         """
-> 1036         connection = self._get_connection()
   1037         try:
   1038             response = connection.send_command(command)

~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in _get_connection(self)
    279 
    280         if connection is None or connection.socket is None:
--> 281             connection = self._create_new_connection()
    282         return connection
    283 

~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in _create_new_connection(self)
    286             self.java_parameters, self.python_parameters,
    287             self.gateway_property, self)
--> 288         connection.connect_to_java_server()
    289         self.set_thread_connection(connection)
    290         return connection

~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in connect_to_java_server(self)
    400                 self.socket = self.ssl_context.wrap_socket(
    401                     self.socket, server_hostname=self.java_address)
--> 402             self.socket.connect((self.java_address, self.java_port))
    403             self.stream = self.socket.makefile("rb")
    404             self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source