'StackOverflowError with getCachedDataFrame

I am facing a StackOverFlow error with the method getCachedDataFrame. I am trying to solve the Label Propagation problem for small graphs (10-15 nodes) using spark and aggregateMessages. At each step, the cached vertices should be updated. But for a large number of iterations (30 to 80) i am facing StackOverflow error.

My Code:

for iter in range(max_iterations):
    print("Iteration {}".format(iter + 1))

    aggregates = g.aggregateMessages(F.collect_list(AM.msg).alias("agg"),
                                     sendToDst=AM.src["newClass"])

    res = aggregates.select("id", "agg", define_possible_class_udf(F.col("agg")).alias("possible_classes")) \
        .drop("agg")

    new_col_names = ['id', 'Class', 'oldClass', 'possible_classes']
    new_vertices = g.vertices.join(res, on="id", how="left_outer").toDF(*new_col_names) \
        .select("id", 'Class', compare_classes_udf(F.col('possible_classes'), F.col('oldClass')).alias('newClass'))

    new_df = new_vertices.select('id', F.col('newClass').alias('class_item'))
 
    join_df = new_df.join(temp_df, [temp_df.id == new_df.id], how='inner')
    empty_df = join_df.filter(join_df['temp_item'] != join_df['class_item'])

    temp_df = new_df.select('id', F.col('class_item').alias('temp_item')).drop("class_item")

    cached_new_vertices = AM.getCachedDataFrame(new_vertices)
    cached_new_vertices.cache()
    g = GraphFrame(cached_new_vertices, g.edges)
    #g.vertices.show()



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source