'Can't Zip RDD with another one while they have equal members - PySpark

I have problem in zipping two RDD after filtering the items. This is my code:

x = sc.parallelize([1,2,3,4])
x = x.zipWithIndex()

# filtering the first element
m1 = x.filter(lambda z: z[1] > 0)
# filtering the last element
m2 = x.filter(lambda z: z[1] < 3)

#zipping
m1.zip(m2).collect()
# expected output: [(1,2),(2,3),(3,4)]

And this is the error I get:

ValueError: Can not deserialize PairRDD with different number of items in batches: (1, 2)

while they have equal items!

Thanks in advance!



Solution 1:[1]

I got the wanted output using 1.zipWithIndex 2.cartesian 3.filter:

x = sc.parallelize([1,2,3,4])

#1
x = x.zipWithIndex()

#2
x = x.cartesian(x)

#3
x = x.filter(lambda pair: pair[0][1]+1 == pair[1][1])

print(x.collect())
#output: [(1,2),(2,3),(3,4)]

But still If there is another way to handle this situation, please inform!

Solution 2:[2]

You can get your expected output by applying a map function in your zipped rdd

m1.zip(m2).map(lambda x: (x[0][1], x[0][0])).collect()

[(1, 2), (2, 3), (3, 4)]

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 A.Najafi
Solution 2 pltc