'Calculating variance of each column after minmax scaler not working pyspark
From data frame that looks like this:
+---+------+-------------+------------+------------+--------------+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|
+---+------+-------------+------------+------------+--------------+
| 39| 77516| 13| 2174| 0| 40|
| 50| 83311| 13| 0| 0| 13|
| 38|215646| 9| 0| 0| 40|
| 53|234721| 7| 0| 0| 40|
| 28|338409| 13| 0| 0| 40|
| 37|284582| 14| 0| 0| 40|
| 49|160187| 5| 0| 0| 16|
| 52|209642| 9| 0| 0| 45|
| 31| 45781| 14| 14084| 0| 50|
| 42|159449| 13| 5178| 0| 40|
+---+------+-------------+------------+------------+--------------+
I am trying to calculate variance however since all are at different scale, I apply min_max scaler then calculate.
calculating variance from above data works fine, here is the function
def get_num_summ_df(df, col_lst):
summ_df_lst = []
for col in col_lst:
summ_df = df.agg(F.round(F.variance(col), 4).alias('var'))
summ_df = summ_df.withColumn('tag_id', F.lit(col))
summ_df_lst.append(summ_df)
return reduce(DataFrame.unionAll, summ_df_lst)
And this is function to extract element from scaled vector.
def extract_one_element(df, ext_cols):
cols = df.columns
fe = F.udf(lambda v:float(v[0]), FloatType())
df = df.select([fe(c).alias(c) if (c in ext_cols) else c for c in cols])
return df
Applying transformation then extracting one element from vector
asmbler = [VectorAssembler(inputCols=[col], outputCol=col+"_vec", handleInvalid='skip')
for col in CONT_FIELDS]
mm_scaler = [MinMaxScaler(inputCol=col+"_vec", outputCol=col+"_mm_scaled")
for col in CONT_FIELDS]
stages = asmbler + mm_scaler
pp_pl = Pipeline(stages=stages).fit(df)
num_cols = df.select(CONT_FIELDS)
num_cols_sc = pp_pl.transform(num_cols)
num_cols_sc_e = extract_one_element(num_cols_sc.select(sc_cols), sc_cols)
Now when trying to calculate variance of scaled column
summ_df = get_num_summ_df(num_cols_sc_e, sc_cols)
summ_df.show()
I get following error Py4JJavaError: An error occurred while calling o2676.showString. Caused by: java.io.EOFException at java.io.DataInputStream.readInt(Unknown Source)
My hypothesis was that it is b.c of datatype so I've checked dtypes of b4 and after transformation + extract element which are long, float repectively. I've tried replacing FloatType() to DecimalType() in extract_one_element function but this cannot extract element from vector, returns Null values.
my plan(if no answer...) is to define min_max scaler on my own so I could scale w/o use of vectorAssembler
Any help would be appreciated!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
