'How do filter with multiple contains in pyspark
I'm going to do a query with pyspark to filter row who contains at least one word in array. For example, the dataframe is:
"content" "other"
My father is big. ...
My mother is beautiful. ...
I'm going to travel. ...
I have an array:
array=["mother","father"]
And the output must be this:
"content" "other"
My father is big. ...
My mother is beautiful. ...
A simple filter for word in array.
Solution 1:[1]
Had the same thoughts as @ARCrow but using instr.
lst=["mother","father"]
DataFrame
data= [
(1,"My father is big."),
(2, "My mother is beautiful"),
(3,"I'm going to travel.")
]
df=spark.createDataFrame(data, ("id",'content'))
Solution
df=(df
.withColumn('phrases', f.array([f.lit(element) for element in lst]))
.where(f.expr('exists(phrases, element -> instr (content, element)>=1)'))
.drop('phrases')
)
df.show()
Outcome
+---+--------------------+
| id| content|
+---+--------------------+
| 1| My father is big.|
| 2|My mother is beau...|
+---+--------------------+
Solution 2:[2]
Taking some the same configuration as @wwnde,
data= [
(1,"My father is big."),
(2, "My mother is beautiful"),
(3,"I'm going to travel.")
]
df=spark.createDataFrame(data, ("id",'content'))
Solution
words = ["father", "mother"]
conditions = " or ".join([f"content like '%{word}%'" for word in words])
(
df
.filter(F.expr(conditions))
.show(truncate=False)
)
+---+----------------------+
|id |content |
+---+----------------------+
|1 |My father is big. |
|2 |My mother is beautiful|
+---+----------------------+
Solution 3:[3]
We made the Fugue project to port native Python or Pandas code to Spark or Dask. This lets you can keep the logic very readable by expressing it in native Python. Fugue can then port it to Spark for you with one function call.
First, we setup,
import pandas as pd
array=["mother","father"]
df = pd.DataFrame({"sentence": ["My father is big.", "My mother is beautiful.", "I'm going to travel. "]})
and then we can create a native Python function to express the logic:
from typing import List, Dict, Any, Iterable
def myfilter(df: List[Dict[str,Any]]) -> Iterable[Dict[str, Any]]:
for row in df:
for value in array:
if value in row["sentence"]:
yield row
and then test it on Pandas:
from fugue import transform
transform(df, myfilter, schema="*")
Because of works on Pandas, we can execute it on Spark by specifying the engine:
import fugue_spark
transform(df, myfilter, schema="*", engine="spark").show()
+---+--------------------+
| id| sentence|
+---+--------------------+
| 0| My father is big.|
| 1|My mother is beau...|
+---+--------------------+
Note we need .show() because Spark evaluates lazily. Schema is also a Spark requirement so Fugue interprets the "*" as all columns in = all columns out.
The fugue transform function can take both Pandas DataFrame inputs and Spark DataFrame inputs.
Edit:
You can replace the myfilter function above with a Pandas implementation like this:
def myfilter(df: pd.DataFrame) -> pd.DataFrame:
res = df.loc[df["sentence"].str.contains("|".join(array))]
return res
and Fugue will be able to port it to Spark the same way. Fugue knows how to adjust to the type hints and this will be faster than the native Python implementation because it takes advantage of Pandas being vectorized.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | wwnde |
| Solution 2 | Justin Davis |
| Solution 3 |
