'pyspark-reddit getting java.lang.StackOverflowError
While running df.head() or df.show() the below error is generated. Attached code below - Is the problem in the structype? i wanted to allow all columns to allow nullable values on construction. When i try to initiate spark object again, connection 111 doesn't go away. Why is initiation of a new sparkcontext get or create object disallowed by previous one's error? Sorry if its a very noob question, I couldn't find a remedy going through docs. All help much appreciated.
import json
import os
import pprint
from pprint import pprint
from re import M
from black import List
import praw
import pyspark
import yaml
from dataclasses import dataclass, field
from typing import Dict, Tuple, List
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import SparkSession
import os
@dataclass(frozen=True)
class RawAttributesToExtract:
post: list = field(
default_factory=lambda: [
"title",
"upvote_ratio",
"score",
"author",
"mod_note",
"over_18",
"distinguished",
"removal_reason",
"report_reasons",
"num_comments",
"created_utc",
"stickied",
"url",
]
)
comments: list = field(default_factory=lambda: ["list1", "list2", "list3"])
users: list = field(default_factory=lambda: ["list1", "list2", "list3"])
class UkrainePosts:
"""returns Ukraine posts data
as a list of dicts
"""
def __init__(self):
self._subreddits_search_string_dict = {
"news": ["ukraine"],
"worldnews": ["WorldNews Live Thread"],
"volunteersforukraine": ["ukraine"],
"politics": ["ukraine"],
"ukraine": [" "],
}
self._post_data = []
def get_subreddit_post_data(
self, subreddit_name: str, search_strings: str, limit: int
):
"""Extracts specfic subreddit's posts, appends into _data list
Args:
subreddit_name (str): r/???
search_strings (str): list of sub strings extracted post can have
limit (int): number of posts to search in hot
"""
subreddit = r.subreddit(subreddit_name)
for post in subreddit.hot(limit=limit):
if any(search_str in post.title for search_str in search_strings):
datum = {
attribute: getattr(post, attribute)
for attribute in RawAttributesToExtract().post
}
self._post_data.append(datum)
def get_data_for_all_subreddits(self):
for sub in self._subreddits_search_string_dict:
self.get_subreddit_post_data(
sub, self._subreddits_search_string_dict[sub], limit=1000
)
return self._post_data
spark = SparkSession.builder.appName("Ukraine").getOrCreate()
r = praw.Reddit(
client_id="",
client_secret="",
user_agent="yourredditusername..",
)
posts_data = UkrainePosts().get_data_for_all_subreddits()
schema = StructType(
[StructField(col, StringType(), True) for col in posts_data[0].keys()]
)
posts_df = spark.createDataFrame([Row(**x) for x in posts_data], schema=schema)
posts_df.head()
22/03/29 11:56:04 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 21) 1]
java.lang.StackOverflowError
at java.util.AbstractMap.toString(AbstractMap.java:547)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
22/03/29 11:56:04 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 21) (15.1.3.85 executor driver): java.lang.StackOverflowError
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
22/03/29 11:56:04 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py", line 480, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/shivamanand/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py", line 504, in send_command
"Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
~/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
<class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused'))
During handling of the above exception, another exception occurred:
Py4JError Traceback (most recent call last)
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
692 with SCCallSiteSync(self._sc) as css:
--> 693 sock_info = self._jdf.collectToPython()
694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1309 return_value = get_return_value(
-> 1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
--> 113 converted = convert_exception(e.java_exception)
114 if not isinstance(converted, UnknownException):
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in convert_exception(e)
97 if c is not None and (
---> 98 c.toString().startswith('org.apache.spark.api.python.PythonException: ')
99 # To make sure this only catches Python UDFs.
~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1309 return_value = get_return_value(
-> 1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
~/shivamenv/venv/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
Py4JError: An error occurred while calling o67.toString
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
/tmp/ipykernel_19736/2778771738.py in <module>
----> 1 posts_df.head(1)
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in head(self, n)
1603 rs = self.head(1)
1604 return rs[0] if rs else None
-> 1605 return self.take(n)
1606
1607 def first(self):
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in take(self, num)
742 [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
743 """
--> 744 return self.limit(num).collect()
745
746 def tail(self, num):
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
691 """
692 with SCCallSiteSync(self._sc) as css:
--> 693 sock_info = self._jdf.collectToPython()
694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
695
~/shivamenv/venv/lib/python3.7/site-packages/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
76 SCCallSiteSync._spark_stack_depth -= 1
77 if SCCallSiteSync._spark_stack_depth == 0:
---> 78 self._context._jsc.setCallSite(None)
~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1306 proto.END_COMMAND_PART
1307
-> 1308 answer = self.gateway_client.send_command(command)
1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
~/shivamenv/venv/lib/python3.7/site-packages/py4j/java_gateway.py in send_command(self, command, retry, binary)
1034 if `binary` is `True`.
1035 """
-> 1036 connection = self._get_connection()
1037 try:
1038 response = connection.send_command(command)
~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in _get_connection(self)
279
280 if connection is None or connection.socket is None:
--> 281 connection = self._create_new_connection()
282 return connection
283
~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in _create_new_connection(self)
286 self.java_parameters, self.python_parameters,
287 self.gateway_property, self)
--> 288 connection.connect_to_java_server()
289 self.set_thread_connection(connection)
290 return connection
~/shivamenv/venv/lib/python3.7/site-packages/py4j/clientserver.py in connect_to_java_server(self)
400 self.socket = self.ssl_context.wrap_socket(
401 self.socket, server_hostname=self.java_address)
--> 402 self.socket.connect((self.java_address, self.java_port))
403 self.stream = self.socket.makefile("rb")
404 self.is_connected = True
ConnectionRefusedError: [Errno 111] Connection refused
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
