'Spark OutOfMemory error while collect_set on each column of a dataframe
My goal is to find distinct elements for each of the columns in a dataframe.
For the purpose of that I am using collect_set. I have tried to repartition and used a lot of executors than required but a couple of column which have high cardinality are making the executors run out of memory.
How can I evaluate whether I am doing it in the right way? Should I use distinct instead?
I have the following code.
val columns1 = Seq("columnA")
val columns2 = Seq("columnB", "columnC", "columnD")
val columnsSet = columns1 ++ columns2
val exprs = columns2.map(v => array_sort(collect_set(v)).alias(v))
val newdf = df.select(columnsSet.map(c => col(c)): _*)
.groupBy(columns1.map(d => col(d)): _*)
.agg(exprs.headOption.getOrElse(null), exprs.tail: _*)
.toDF(columnsSet: _*)
.withColumn("zipped", explode(arrays_zip(columns2.map(d => col(d)): _*)))
.select(columns1.headOption.getOrElse(null), (columns1.tail :+ "zipped.*"): _*)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
