'Join data frames with substring conditions on columns

I am writing the function in Scala to fetch data for training the ML model. I have a dataframe DF1 which have a one column consisting of names. Another dataframe DF2 which consists of columns [description, released, ... few more]

I want to create dataframe DF3 which is join of DF1 and DF2 on condition that is names of DF1 should is in description of DF2.

Example:

DF1
   name
0  John
1  Mike
2  Kara

DF2
   released total  description
0  2006     5      This involved John and Kara who played role of......
1  2010     120    It is the latest release of Mike and Kara after almost decade...


DF3 [Expected output DF]
      name    released  total description
0      John    2006      5     This involved John and Kara who played role of......
1      Kara    2006      5     This involved John and Kara who played role of......
2      Kara    2010      120   It is the latest release of Mike and Kara after almost decade...
3      Mike    2010      120  It is the latest release of Mike and Kara after almost decade...

I am trying to do cross join so make all combinations, and then filtering out the based on conditions on column name and description.

val DF3 = DF1.crossjoin(DF2).filter(col("name") in col("description"))

Seems, there is no contains method in Snowpark available to do this.

Anyone has idea on how to do it?



Solution 1:[1]

There are at least 2 solutions, but you should ask yourself some questions:

  1. Do you wanna find substring, or you want find a word? E.g., do you wanna find Karan for name Kara?
  2. Do you wanna store or use result dataframe in such state? Maybe you wanna store/use it in more optimal way, e.g., for each name store indexes/positions of rows of DF2.

You can test (on big and real dataset) which one is faster and more suitable for you.

1st solution (via DataFrame)

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Main extends App {
  case class Name(name: String)
  case class TextInfo(year: Int, moth: Int, text: String)
  val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  val namesDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq("John", "Mike", "Kara").map(Name)))
  val textToSearchDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq(
    TextInfo(2006, 5, "This involved John and Kara who played role of"),
    TextInfo(2010, 120, "It is the latest release of Mike and Kara after almost decade")
  )))
  val resultDf: DataFrame = textToSearchDf.crossJoin(namesDf)
    .where(new Column($"text" contains $"name"))
  resultDf.foreach(println(_))
}

2nd solution, via RDD:

  val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  val namesAsRdd: RDD[String] = sc.parallelize(Seq("John", "Mike", "Kara"))
  val rddWithTextToSearch: RDD[(Int, Int, String)] = sc.parallelize(Seq(
    (2006, 5, "This involved John and Kara who played role of"),
    (2010, 120, "It is the latest release of Mike and Kara after almost decade")
  ))
  val names: Set[String] = namesAsRdd.collect().toSet
  val resultRdd: RDD[(String, Int, Int, String)] = rddWithTextToSearch.flatMap {
    case (year, month, str) => names.filter(name => str.contains(name)).map(name => (name, year, month, str))
  }
  resultRdd.foreach(println(_))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Mikhail Ionkin