'Iterating over Spark RDD and Update a Secondary one with PySpark
I am refactoring to spark, a function that receives two lists of dictionaries as parameters, It compare keys that are identical in both dicts and then assign the value from this match in a third dict.
I wouldn't like to change much in the logic. So I converted both list of dicts to RDD, what I would like to do now is modify the function to receive these pair of RDD's and then generate the third RDD.
I am doing as follows and getting an error that the RDD can be created only in Driver. I suppose this is happening because of the emptyRDD call inside the .foreach() method.
Any idea how could I write some code having the desired output? (actually in my test I was passing just a single rdd as func parameter and just printing out if dict key is = a)
test_rdd = spark.sparkContext.parallelize(
[{"a": 1}, {"b": 2}, {"c": 3}, {"d": 4}]
)
def iterate(rdd1,rdd2):
sf_dict = spark.sparkContext.emptyRDD()
for key1 in rdd1.items():
for key2, element2 in rdd2.items():
if rdd1[key1] == rdd2[key2]:
sf_dict[rdd1[key]] = rdd2[element2]
return sf_dict
test_rdd.foreach(iterate)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
