'Spark OutOfMemory error while collect_set on each column of a dataframe

My goal is to find distinct elements for each of the columns in a dataframe.

For the purpose of that I am using collect_set. I have tried to repartition and used a lot of executors than required but a couple of column which have high cardinality are making the executors run out of memory.

How can I evaluate whether I am doing it in the right way? Should I use distinct instead?

I have the following code.

val columns1 = Seq("columnA")
val columns2 = Seq("columnB", "columnC", "columnD")
val columnsSet = columns1 ++ columns2

val exprs = columns2.map(v => array_sort(collect_set(v)).alias(v))
val newdf = df.select(columnsSet.map(c => col(c)): _*)
      .groupBy(columns1.map(d => col(d)): _*)
      .agg(exprs.headOption.getOrElse(null), exprs.tail: _*)
      .toDF(columnsSet: _*)
      .withColumn("zipped", explode(arrays_zip(columns2.map(d => col(d)): _*)))
      .select(columns1.headOption.getOrElse(null), (columns1.tail :+ "zipped.*"): _*)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source