'Calculating variance of each column after minmax scaler not working pyspark

From data frame that looks like this:

+---+------+-------------+------------+------------+--------------+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|
+---+------+-------------+------------+------------+--------------+
| 39| 77516|           13|        2174|           0|            40|
| 50| 83311|           13|           0|           0|            13|
| 38|215646|            9|           0|           0|            40|
| 53|234721|            7|           0|           0|            40|
| 28|338409|           13|           0|           0|            40|
| 37|284582|           14|           0|           0|            40|
| 49|160187|            5|           0|           0|            16|
| 52|209642|            9|           0|           0|            45|
| 31| 45781|           14|       14084|           0|            50|
| 42|159449|           13|        5178|           0|            40|
+---+------+-------------+------------+------------+--------------+

I am trying to calculate variance however since all are at different scale, I apply min_max scaler then calculate.

calculating variance from above data works fine, here is the function

def get_num_summ_df(df, col_lst):
    summ_df_lst = []
    for col in col_lst:
        summ_df = df.agg(F.round(F.variance(col), 4).alias('var'))
        summ_df = summ_df.withColumn('tag_id', F.lit(col))
        summ_df_lst.append(summ_df)
    return reduce(DataFrame.unionAll, summ_df_lst)

And this is function to extract element from scaled vector.

def extract_one_element(df, ext_cols):
    cols = df.columns
    fe = F.udf(lambda v:float(v[0]), FloatType())
    df = df.select([fe(c).alias(c) if (c in ext_cols) else c for c in cols])
    return df

Applying transformation then extracting one element from vector

asmbler = [VectorAssembler(inputCols=[col], outputCol=col+"_vec", handleInvalid='skip')
           for col in CONT_FIELDS]
mm_scaler = [MinMaxScaler(inputCol=col+"_vec", outputCol=col+"_mm_scaled")
          for col in CONT_FIELDS]

stages = asmbler + mm_scaler

pp_pl = Pipeline(stages=stages).fit(df)
num_cols = df.select(CONT_FIELDS)
num_cols_sc = pp_pl.transform(num_cols)
num_cols_sc_e = extract_one_element(num_cols_sc.select(sc_cols), sc_cols)

Now when trying to calculate variance of scaled column

summ_df = get_num_summ_df(num_cols_sc_e, sc_cols)
summ_df.show()

I get following error Py4JJavaError: An error occurred while calling o2676.showString. Caused by: java.io.EOFException at java.io.DataInputStream.readInt(Unknown Source)

My hypothesis was that it is b.c of datatype so I've checked dtypes of b4 and after transformation + extract element which are long, float repectively. I've tried replacing FloatType() to DecimalType() in extract_one_element function but this cannot extract element from vector, returns Null values.

my plan(if no answer...) is to define min_max scaler on my own so I could scale w/o use of vectorAssembler

Any help would be appreciated!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source