'PySpark apply custom function to each row data frame

I have a pyspark data frame and I am trying to compute a custom function on each row of the data frame. This function is fairly complex and I don't believe I can perform column operations to achieve my desired outcome (however, if you have some suggestions for how to do this, I'm all ears). The data frame consists of lots of different values, but the custom function operates only on 4 of them, and produces a boolean value. The data frame is something like this:

Date        value   value_minus1     value_minus2     value_plus1
2022-01-01  21       177              157              27
2022-01-04  165      24               27               229
2022-01-06  110      229              165              189
2022-01-08  36       189              110              23
2022-01-11  295      34               23               110
2022-01-12  110      295              34               132
2021-12-29  223      60               null             207
2022-01-08  75       235              235              82
2022-01-08  149      473              475              149
2022-01-10  327      149              149              368

The function I am trying to apply is:

@udf(returnType=BooleanType())
def my_search(value, value_minus1, value_minus2, value_plus1):
    THRESHOLD = 50

    FACTOR = 0.2

    if (
        (value > 0)
        & (value_minus1> 0)
        & (value != value_minus2)
        & (abs(value - value_minus1) >= THRESOLD)
        & (
            F.isnan(value_minus2)
            | (abs(value_minus1- value_minus2) < FACTOR*max(value_minus1, value_minus2))
        )
        & (
            F.isnan(value_plus1)
            | (abs(value_plus1- value_minus1) < FACTOR*max(value_plus1, value_minus1))
        )
    ):
        x1 = re.sub('\\.0', '', str(value))
        y1 = re.sub('\\.0', '', str(value_minus1))
        if (
            (type(re.search("^"+y1, x1)) == re.Match) | (type(re.search("^"+x1, y1)) == re.Match)
            | (type(re.search(y1+"$", x1)) == re.Match) | (type(re.search(x1+"$", y1)) == re.Match)
        ):
            return True
        else:
            return False
    else:
        return False

The imports and call are:

from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
import re

df = df.withColumn(
    "booleanCondition",
    my_search(F.col("value"), F.col("value_minus1"), F.col("value_minus2"), F.col("value_plus1"))
)

So, when I've run this, I can't seem to get the function to work. The reason could lie in the function itself (a genuine possibility). I have doubts about the way I'm using F.isnan, so if there is a better way to perform that check, let me know. Otherwise, I'm not sure I know what is going wrong. Anyhow, any help is appreciated.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source