'StackOverflowError with getCachedDataFrame
I am facing a StackOverFlow error with the method getCachedDataFrame. I am trying to solve the Label Propagation problem for small graphs (10-15 nodes) using spark and aggregateMessages. At each step, the cached vertices should be updated. But for a large number of iterations (30 to 80) i am facing StackOverflow error.
My Code:
for iter in range(max_iterations):
print("Iteration {}".format(iter + 1))
aggregates = g.aggregateMessages(F.collect_list(AM.msg).alias("agg"),
sendToDst=AM.src["newClass"])
res = aggregates.select("id", "agg", define_possible_class_udf(F.col("agg")).alias("possible_classes")) \
.drop("agg")
new_col_names = ['id', 'Class', 'oldClass', 'possible_classes']
new_vertices = g.vertices.join(res, on="id", how="left_outer").toDF(*new_col_names) \
.select("id", 'Class', compare_classes_udf(F.col('possible_classes'), F.col('oldClass')).alias('newClass'))
new_df = new_vertices.select('id', F.col('newClass').alias('class_item'))
join_df = new_df.join(temp_df, [temp_df.id == new_df.id], how='inner')
empty_df = join_df.filter(join_df['temp_item'] != join_df['class_item'])
temp_df = new_df.select('id', F.col('class_item').alias('temp_item')).drop("class_item")
cached_new_vertices = AM.getCachedDataFrame(new_vertices)
cached_new_vertices.cache()
g = GraphFrame(cached_new_vertices, g.edges)
#g.vertices.show()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
