'Iterating over Spark RDD and Update a Secondary one with PySpark

I am refactoring to spark, a function that receives two lists of dictionaries as parameters, It compare keys that are identical in both dicts and then assign the value from this match in a third dict.

I wouldn't like to change much in the logic. So I converted both list of dicts to RDD, what I would like to do now is modify the function to receive these pair of RDD's and then generate the third RDD.

I am doing as follows and getting an error that the RDD can be created only in Driver. I suppose this is happening because of the emptyRDD call inside the .foreach() method.

Any idea how could I write some code having the desired output? (actually in my test I was passing just a single rdd as func parameter and just printing out if dict key is = a)

test_rdd = spark.sparkContext.parallelize(
    [{"a": 1}, {"b": 2}, {"c": 3}, {"d": 4}]
)
    def iterate(rdd1,rdd2):
    sf_dict = spark.sparkContext.emptyRDD()
    for key1 in rdd1.items():
        for key2, element2 in rdd2.items():
            if rdd1[key1] == rdd2[key2]:
                sf_dict[rdd1[key]] = rdd2[element2] 

    return sf_dict

test_rdd.foreach(iterate)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source