'Perform sklearn DBSCAN on PySpark dataframe column

I have a Spark dataframe that looks like this:

+-----+----------+--------+-----+
|key1 |date      |variable|value|
+-----+----------+--------+-----+
|  A49|2022-03-20|      V1|   31|
|  A49|2022-03-21|      V1|   39|
|  A49|2022-03-22|      V1|   33|
|  A49|2022-03-23|      V1|   27|
|  A49|2022-03-24|      V1|   32|
|  A49|2022-03-25|      V1|   31|
|  A49|2022-03-26|      V1|   29|
|  A49|2022-03-27|      V1|   31|
|  A49|2022-03-28|      V1|   29|
|  A49|2022-03-29|      V1|   33|
|  A49|2022-03-30|      V1|   35|
|  A49|2022-03-31|      V1|   36|
|  A49|2022-04-01|      V1|   33|
|  A49|2022-04-02|      V1|   33|
|  A49|2022-04-03|      V1|   33|
+-----+----------+--------+-----+

There are several thousand unique values of key1 and about 30 unique values of variable. I am trying to apply DBSCAN from sklearn.cluster to each (key1, variable) pair. However, I am stuck on getting DBSCAN to work with this single pair, which I'm doing to troubleshoot the results. I am using pandas_udf to do the computation. Here is my DBSCAN function:

from scipy.stats import median_abs_deviation
from sklearn.cluster import DBSCAN
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType


DBSCAN_HYPERPARAMETERS = None

@pandas_udf(returnType=IntegerType())
def run_dbscan_pandas(values: pd.Series) -> pd.Series:

    values = values.to_numpy()

    if DBSCAN_HYPERPARAMETERS is None:
        epsilon = 2 * max(median_abs_deviation(values), 1)
        minimumSamples = 2
    else:
        epsilon, minimumSamples = DBSCAN_HYPERPARAMETERS
    
    dbModel = DBSCAN(eps=4.0, min_samples=minimumSamples).fit(values.reshape(-1, 1))

    return pd.Series(dbModel.labels_)

My call to the pandas_udf is:

values = df.where(F.col("key1") == "A49").select("value")

clusters = run_dbscan_pandas(values["value"])

The issue I'm running into here is that it appears that the entire column is not being passed into run_dbscan_pandas; rather, it's one value at-a-time. For example, I've noticed that if I return abs_median_deviation(values) from the function, I get 0, which is clearly incorrect. Any suggestions on how to best implement this?



Solution 1:[1]

So I figured this out, but without using pandas_udf. Instead I used the GroupedData.applyInPandas method. I'll leave my working code here just in case someone else has a similar problem:

clusters = (
    df
    .groupBy(["key1", "variable"])
    .applyInPandas(
        run_dbscan_pandas,
        schema=(
            "key1 string, date date,"
            + " variable string, value int, dbscan double"
        )
    )
)


def run_dbscan_pandas(groupedData: pd.DataFrame) -> pd.DataFrame:
    values = groupedData["value"].to_numpy()

    if DBSCAN_HYPERPARAMETERS is None:
        epsilon = 2 * max(median_abs_deviation(values), 1)
        minimumSamples = 2
    else:
        epsilon, minimumSamples = DBSCAN_HYPERPARAMETERS
    
    dbModel = DBSCAN(eps=epsilon, min_samples=minimumSamples).fit(values.reshape(-1, 1))

    groupedData = groupedData.assign(dbscan=dbModel.labels_)

    return groupedData

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 CopyOfA