'Comparing two array columns in Scala Spark
I have a dataframe of format given below.
movieId1 | genreList1 | genreList2
--------------------------------------------------
1 |[Adventure,Comedy] |[Adventure]
2 |[Animation,Drama,War] |[War,Drama]
3 |[Adventure,Drama] |[Drama,War]
and trying to create another flag column which shows whether genreList2 is a subset of genreList1
movieId1 | genreList1 | genreList2 | Flag
---------------------------------------------------------------
1 |[Adventure,Comedy] | [Adventure] |1
2 |[Animation,Drama,War] | [War,Drama] |1
3 |[Adventure,Drama] | [Drama,War] |0
I have tried this
def intersect_check(a: Array[String], b: Array[String]): Int = {
if (b.sameElements(a.intersect(b))) { return 1 }
else { return 2 }
}
def intersect_check_udf =
udf((colvalue1: Array[String], colvalue2: Array[String]) => intersect_check(colvalue1, colvalue2))
data = data.withColumn("Flag", intersect_check_udf(col("genreList1"), col("genreList2")))
But this throws org.apache.spark.SparkException: Failed to execute user defined function. Error. Any ideas on how to resolve this.
P.S.: The above function (intersect_check) works for Arrays.
Solution 1:[1]
Here is the solution converting using subsetOf
val spark =
SparkSession.builder().master("local").appName("test").getOrCreate()
import spark.implicits._
val data = spark.sparkContext.parallelize(
Seq(
(1,Array("Adventure","Comedy"),Array("Adventure")),
(2,Array("Animation","Drama","War"),Array("War","Drama")),
(3,Array("Adventure","Drama"),Array("Drama","War"))
)).toDF("movieId1", "genreList1", "genreList2")
val subsetOf = udf((col1: Seq[String], col2: Seq[String]) => {
if (col2.toSet.subsetOf(col1.toSet)) 1 else 0
})
data.withColumn("flag", subsetOf(data("genreList1"), data("genreList2"))).show()
Hope this helps!
Solution 2:[2]
One solution may be to exploit spark array builtin functions: genreList2 is subset of genreList1 if the intersection between the two is equal to genreList2. In the code below a sort_array operation has been added to avoid a mismatch between two arrays with different ordering but same elements.
val spark = {
SparkSession
.builder()
.master("local")
.appName("test")
.getOrCreate()
}
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val df = Seq(
(1, Array("Adventure","Comedy"), Array("Adventure")),
(2, Array("Animation","Drama","War"), Array("War","Drama")),
(3, Array("Adventure","Drama"), Array("Drama","War"))
).toDF("movieId1", "genreList1", "genreList2")
df
.withColumn("flag",
sort_array(array_intersect($"genreList1",$"genreList2"))
.equalTo(
sort_array($"genreList2")
)
.cast("integer")
)
.show()
The output is
+--------+--------------------+------------+----+
|movieId1| genreList1| genreList2|flag|
+--------+--------------------+------------+----+
| 1| [Adventure, Comedy]| [Adventure]| 1|
| 2|[Animation, Drama...|[War, Drama]| 1|
| 3| [Adventure, Drama]|[Drama, War]| 0|
+--------+--------------------+------------+----+
Solution 3:[3]
This can also work here and it does not use udf
import spark.implicits._
val data = Seq(
(1,Array("Adventure","Comedy"),Array("Adventure")),
(2,Array("Animation","Drama","War"),Array("War","Drama")),
(3,Array("Adventure","Drama"),Array("Drama","War"))
).toDF("movieId1", "genreList1", "genreList2")
data
.withColumn("size",size(array_except($"genreList2",$"genreList1")))
.withColumn("flag",when($"size" === lit(0), 1) otherwise(0))
.show(false)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | koiralo |
| Solution 2 | |
| Solution 3 | whoisthis |
