'Perform sklearn DBSCAN on PySpark dataframe column
I have a Spark dataframe that looks like this:
+-----+----------+--------+-----+
|key1 |date |variable|value|
+-----+----------+--------+-----+
| A49|2022-03-20| V1| 31|
| A49|2022-03-21| V1| 39|
| A49|2022-03-22| V1| 33|
| A49|2022-03-23| V1| 27|
| A49|2022-03-24| V1| 32|
| A49|2022-03-25| V1| 31|
| A49|2022-03-26| V1| 29|
| A49|2022-03-27| V1| 31|
| A49|2022-03-28| V1| 29|
| A49|2022-03-29| V1| 33|
| A49|2022-03-30| V1| 35|
| A49|2022-03-31| V1| 36|
| A49|2022-04-01| V1| 33|
| A49|2022-04-02| V1| 33|
| A49|2022-04-03| V1| 33|
+-----+----------+--------+-----+
There are several thousand unique values of key1 and about 30 unique values of variable. I am trying to apply DBSCAN from sklearn.cluster to each (key1, variable) pair. However, I am stuck on getting DBSCAN to work with this single pair, which I'm doing to troubleshoot the results. I am using pandas_udf to do the computation. Here is my DBSCAN function:
from scipy.stats import median_abs_deviation
from sklearn.cluster import DBSCAN
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
DBSCAN_HYPERPARAMETERS = None
@pandas_udf(returnType=IntegerType())
def run_dbscan_pandas(values: pd.Series) -> pd.Series:
values = values.to_numpy()
if DBSCAN_HYPERPARAMETERS is None:
epsilon = 2 * max(median_abs_deviation(values), 1)
minimumSamples = 2
else:
epsilon, minimumSamples = DBSCAN_HYPERPARAMETERS
dbModel = DBSCAN(eps=4.0, min_samples=minimumSamples).fit(values.reshape(-1, 1))
return pd.Series(dbModel.labels_)
My call to the pandas_udf is:
values = df.where(F.col("key1") == "A49").select("value")
clusters = run_dbscan_pandas(values["value"])
The issue I'm running into here is that it appears that the entire column is not being passed into run_dbscan_pandas; rather, it's one value at-a-time. For example, I've noticed that if I return abs_median_deviation(values) from the function, I get 0, which is clearly incorrect. Any suggestions on how to best implement this?
Solution 1:[1]
So I figured this out, but without using pandas_udf. Instead I used the GroupedData.applyInPandas method. I'll leave my working code here just in case someone else has a similar problem:
clusters = (
df
.groupBy(["key1", "variable"])
.applyInPandas(
run_dbscan_pandas,
schema=(
"key1 string, date date,"
+ " variable string, value int, dbscan double"
)
)
)
def run_dbscan_pandas(groupedData: pd.DataFrame) -> pd.DataFrame:
values = groupedData["value"].to_numpy()
if DBSCAN_HYPERPARAMETERS is None:
epsilon = 2 * max(median_abs_deviation(values), 1)
minimumSamples = 2
else:
epsilon, minimumSamples = DBSCAN_HYPERPARAMETERS
dbModel = DBSCAN(eps=epsilon, min_samples=minimumSamples).fit(values.reshape(-1, 1))
groupedData = groupedData.assign(dbscan=dbModel.labels_)
return groupedData
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | CopyOfA |
