'Join data frames with substring conditions on columns
I am writing the function in Scala to fetch data for training the ML model. I have a dataframe DF1 which have a one column consisting of names. Another dataframe DF2 which consists of columns [description, released, ... few more]
I want to create dataframe DF3 which is join of DF1 and DF2 on condition that is names of DF1 should is in description of DF2.
Example:
DF1
name
0 John
1 Mike
2 Kara
DF2
released total description
0 2006 5 This involved John and Kara who played role of......
1 2010 120 It is the latest release of Mike and Kara after almost decade...
DF3 [Expected output DF]
name released total description
0 John 2006 5 This involved John and Kara who played role of......
1 Kara 2006 5 This involved John and Kara who played role of......
2 Kara 2010 120 It is the latest release of Mike and Kara after almost decade...
3 Mike 2010 120 It is the latest release of Mike and Kara after almost decade...
I am trying to do cross join so make all combinations, and then filtering out the based on conditions on column name and description.
val DF3 = DF1.crossjoin(DF2).filter(col("name") in col("description"))
Seems, there is no contains method in Snowpark available to do this.
Anyone has idea on how to do it?
Solution 1:[1]
There are at least 2 solutions, but you should ask yourself some questions:
- Do you wanna find substring, or you want find a word? E.g., do you wanna find Karan for name Kara?
- Do you wanna store or use result dataframe in such state? Maybe you wanna store/use it in more optimal way, e.g., for each name store indexes/positions of rows of DF2.
You can test (on big and real dataset) which one is faster and more suitable for you.
1st solution (via DataFrame)
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Main extends App {
case class Name(name: String)
case class TextInfo(year: Int, moth: Int, text: String)
val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq("John", "Mike", "Kara").map(Name)))
val textToSearchDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq(
TextInfo(2006, 5, "This involved John and Kara who played role of"),
TextInfo(2010, 120, "It is the latest release of Mike and Kara after almost decade")
)))
val resultDf: DataFrame = textToSearchDf.crossJoin(namesDf)
.where(new Column($"text" contains $"name"))
resultDf.foreach(println(_))
}
2nd solution, via RDD:
val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesAsRdd: RDD[String] = sc.parallelize(Seq("John", "Mike", "Kara"))
val rddWithTextToSearch: RDD[(Int, Int, String)] = sc.parallelize(Seq(
(2006, 5, "This involved John and Kara who played role of"),
(2010, 120, "It is the latest release of Mike and Kara after almost decade")
))
val names: Set[String] = namesAsRdd.collect().toSet
val resultRdd: RDD[(String, Int, Int, String)] = rddWithTextToSearch.flatMap {
case (year, month, str) => names.filter(name => str.contains(name)).map(name => (name, year, month, str))
}
resultRdd.foreach(println(_))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Mikhail Ionkin |
