'Data Bricks read RDD

I need to read a CSV file as an RDD and then extract the desired data using functions, the issue is the first function cannot produce proper RDD for the second function.

this my code:

def extract_vin_key_value(line):
    sr=line.split(',')
    if sr[1]=='I':
        return sr[2],sr[3],sr[4],sr[5]

def popolate_make(make):
    return make
    
raw_rdd = sc.textFile('/FileStore/tables/data.csv')
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv=vin_kv.filter(lambda x: x is not None)
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))

The CSV file is:

1   I   VXIO456XLBB630221   Nissan  Altima  2003    2002-05-08  Initial sales from TechMotors
2   I   INU45KIOOPA343980   Mercedes    C300    2015    2014-01-01  Sold from EuroMotors
3   A   VXIO456XLBB630221               2014-07-02  Head on collision
4   R   VXIO456XLBB630221               2014-08-05  Repair transmission
5   I   VOME254OOXW344325   Mercedes    E350    2015    2014-02-01  Sold from Carmax
6   R   VOME254OOXW344325               2015-02-06  Wheel alignment service
7   R   VXIO456XLBB630221               2015-01-01  Replace right head light
8   I   EXOA00341AB123456   Mercedes    SL550   2016    2015-01-01  Sold from AceCars
9   A   VOME254OOXW344325               2015-10-01  Side collision
10  R   VOME254OOXW344325               2015-09-01  Changed tires
11  R   EXOA00341AB123456               2015-05-01  Repair engine
12  A   EXOA00341AB123456               2015-05-03  Vehicle rollover
13  R   VOME254OOXW344325               2015-09-01  Replace passenger side door
14  I   UXIA769ABCC447906   Toyota  Camery  2017    2016-05-08  Initial sales from Carmax
15  R   UXIA769ABCC447906               2020-01-02  Initial sales from Carmax
16  A   INU45KIOOPA343980               2020-05-01  Side collision

The output of the first function is:

[('VXIO456XLBB630221', 'Nissan', 'Altima', '2003'),
 ('INU45KIOOPA343980', 'Mercedes', 'C300', '2015'),
 ('VOME254OOXW344325', 'Mercedes', 'E350', '2015'),
 ('EXOA00341AB123456', 'Mercedes', 'SL550', '2016'),
 ('UXIA769ABCC447906', 'Toyota', 'Camery', '2017')]

And the error message is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 256.0 failed 1 times, most recent failure: Lost task 0.0 in stage 256.0 (TID 338) (ip-10-172-200-7.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'ValueError: too many values to unpack (expected 2)'.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source