'I am new with Pyspark, I am trying onehotencoding in iot dataset for deeplearning implementation

When I am trying to fit the pipeline, I am getting a error like this

Py4JJavaError: An error occurred while calling o1380.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 71.0 failed 1 times, most recent failure: Lost task 6.0 in stage 71.0 (TID 682) (LAPTOP-DMUSUVBM executor driver): java.lang.IllegalArgumentException: requirement failed: Vector should have dimension larger than zero.

Here is the code I tried:

def select_features_to_scale(df1=df1, lower_skew=-2, upper_skew=2, dtypes='int32'):
selected_features = []

feature_list = list(df1.toPandas().select_dtypes(include=[dtypes]).columns)


for feature in feature_list:

    if df1.toPandas()[feature].kurtosis() < -2 or df1.toPandas()[feature].kurtosis() > 2:
        
        selected_features.append(feature)

return selected_features

Here I am creating Spark Pipeline

cat_features = ['duration', 'orig_bytes', 'resp_bytes', 'orig_pkts', 'proto_icmp', 'proto_tcp', 'proto_udp']
label = 'label'
stages = []

Loop for StringIndexer and OHE for Categorical Variables

    for features in cat_features:
    
   
    string_indexer = StringIndexer(inputCol=features, outputCol=features + "_index")
    

One Hot Encode Categorical Features

    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()],
                                     outputCols=[features + "_class_vec"])
    
stages += [string_indexer, encoder]


label_str_index =  StringIndexer(inputCol=label, outputCol="label_index")


unscaled_features = select_features_to_scale(df1=df1, lower_skew=-2, upper_skew=2, dtypes='int32')

unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")

stages += [unscaled_assembler, scaler]

assembler_inputs = [feature + "_class_vec" for feature in cat_features]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") 

stages += [label_str_index, assembler]

assembler_final = VectorAssembler(inputCols=["scaled_features","assembled_inputs"], outputCol="features")

stages += [assembler_final]


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source