'Pyspark process array column using udf and return another array

process array column using udf and return another array

Below is my input:

docID Shingles D1 [23, 25, 39,59] D2 [34, 45, 65]

I want to generate a new column called hashes by processing shingles array column: For example, I want to extract min and max (this is just example toshow that I want a fixed length array column, I don’t actually want to find min or max)

docID Shingles Hashes D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]

I created a udf as below:

def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

But it gives following error:

TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Actual udf:

def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature


Solution 1:[1]

My problem is not exactly the same. but a similar one - I had to sent three array type columns as input and get an array type (of string types ) as output

I was returning a list and tried many other approaches but it did not succeed.

def func_req(oldlist , newlist , pve):
    deleted_stores = list(set(oldlist) - set(newlist))
    new_stores = list(set(newlist) - set(oldlist))
    old_map = dict(zip(list(oldlist), list(pvector)))
    for key in deleted_stores:
        old_map.pop(key)
    for key in newlist:
        if key not in old_map.keys():
            old_map[key] = 'PTest'
    pvec=list(old_map.values())
    return pvec

I called it as in this statement:

df_diff = df3.withColumn(
    'updatedp',
    func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)

It gave me an error:

AssertionError: col should be Column

Solution

Then, I came across this post and introduced a wrapper function -

func_req_wrapper = f.udf(func_req, ArrayType(StringType()))

and called it in:

df_diff = df3.withColumn(
    'updatedp', 
    func_req_wrapper('oldlist',  'presentlist', 'pvec')
)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Vincent Doba