'Pyspark process array column using udf and return another array
process array column using udf and return another array
Below is my input:
docID Shingles D1 [23, 25, 39,59] D2 [34, 45, 65]
I want to generate a new column called hashes by processing shingles array column: For example, I want to extract min and max (this is just example toshow that I want a fixed length array column, I don’t actually want to find min or max)
docID Shingles Hashes D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]
I created a udf as below:
def generate_minhash_signatures(shingles, coeffA, coeffB):
signature = []
minHashCode = nextPrime + 1
maxHashCode = 0
for shingleID in shingles:
if shingleID < minHashCode:
minHashCode = shingleID
if shingleID > maxHashCode:
maxHashCode = shingleID
return [minHashCode, maxHashCode]
minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()
But it gives following error:
TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
Actual udf:
def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
signature = []
for i in range(0, numHashes):
minHashCode = nextPrime + 1
for shingleID in shingles:
hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime
if hashCode < minHashCode:
minHashCode = hashCode
signature.append(minHashCode)
return signature
Solution 1:[1]
My problem is not exactly the same. but a similar one - I had to sent three array type columns as input and get an array type (of string types ) as output
I was returning a list and tried many other approaches but it did not succeed.
def func_req(oldlist , newlist , pve):
deleted_stores = list(set(oldlist) - set(newlist))
new_stores = list(set(newlist) - set(oldlist))
old_map = dict(zip(list(oldlist), list(pvector)))
for key in deleted_stores:
old_map.pop(key)
for key in newlist:
if key not in old_map.keys():
old_map[key] = 'PTest'
pvec=list(old_map.values())
return pvec
I called it as in this statement:
df_diff = df3.withColumn(
'updatedp',
func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)
It gave me an error:
AssertionError: col should be Column
Solution
Then, I came across this post and introduced a wrapper function -
func_req_wrapper = f.udf(func_req, ArrayType(StringType()))
and called it in:
df_diff = df3.withColumn(
'updatedp',
func_req_wrapper('oldlist', 'presentlist', 'pvec')
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Vincent Doba |
