'AWS Glue job not loading data from s3 to amazon Aurora db-giving null error

My aim is to populate data from a file in s3 -s3://dev-itr-uls-weekly-files/2022-01-11/a_aircr/AD.dat into a table in AWS Aurora.

The script written in AWS Glue job is given below.

 import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "fulldatas-01-11-db", table_name = 
 "ad_dat_d34ee0e6b01d07f287826be0cf826b01", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    #datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "fulldatas-01-11-db", table_name = "ad_dat_d34ee0e6b01d07f287826be0cf826b01", transformation_ctx = "datasource0")
    
    connection_type = 's3'
    
    datasource0 = glueContext.create_dynamic_frame_from_options(connection_type,
    connection_options={"paths": ["s3://dev-itr-uls-weekly-files/2022-01-11/a_aircr/AD.dat"]}, format="csv", 
    format_options={"quoteChar": -1, "separator": "|"}, transformation_ctx = "datasource0")
    #datasource0.printSchema()
    #datasource0.show(1)
    
    ## @type: ApplyMapping
    ## @args: [mapping = [("col0", "string", "col0", "string"), ("col1", "long", "col1", "long"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string"), ("col5", "string", "col5", "string"), ("col6", "string", "col6", "string"), ("col7", "string", "col7", "string"), ("col8", "string", "col8", "string"), ("col9", "long", "col9", "long"), ("col10", "string", "col10", "string"), ("col11", "string", "col11", "string"), ("col12", "string", "col12", "string"), ("col13", "string", "col13", "string"), ("col14", "string", "col14", "string"), ("col15", "string", "col15", "string"), ("col16", "string", "col16", "string"), ("col17", "string", "col17", "string"), ("col18", "string", "col18", "string"), ("col19", "string", "col19", "string"), ("col20", "string", "col20", "string"), ("col21", "string", "col21", "string"), ("col22", "string", "col22", "string"), ("col23", "string", "col23", "string"), ("col24", "string", "col24", "string"), ("col25", "string", "col25", "string"), ("col26", "string", "col26", "string"), ("col27", "string", "col27", "string"), ("col28", "string", "col28", "string"), ("col29", "string", "col29", "string")], transformation_ctx = "applymapping1"]
    ## @return: applymapping1
    ## @inputs: [frame = datasource0]
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "record_type", "string"), ("col1", "long", "unique_system_identifier", "long"), ("col2", "string", "uls_file_number", "string"), ("col3", "string", "ebf_number", "string"), ("col4", "string", "application_purpose", "string"), ("col5", "string", "application_status", "string"), ("col6", "string", "application_fee_exempt", "string"), ("col7", "string", "regulatory_fee_exempt", "string"), ("col8", "string", "source", "string"), ("col9", "long", "requested_expiration_date_mmdd", "long"), ("col10", "string", "receipt_date", "string"), ("col11", "string", "notification_code", "string"), ("col12", "string", "notification_date", "string"), ("col13", "string", "expanding_area_or_contour", "string"), ("col14", "string", "change_type", "string"), ("col15", "string", "original_application_purpose", "string"), ("col16", "string", "requesting_a_waiver", "string"), ("col17", "string", "how_many_waivers_requested", "string"), ("col18", "string", "any_attachments", "string"), ("col19", "string", "number_of_requested_SIDs", "string"), ("col20", "string", "fee_control_num", "string"), ("col21", "string", "date_entered", "string"), ("col22", "string", "reason", "string"), ("col23", "string", "frequency_coordination_indicator", "string"), ("col24", "string", "emergency_sta", "string"), ("col25", "string", "overall_change_type", "string"), ("col26", "string", "slow_growth_ind", "string"), ("col27", "string", "previous_waiver", "string"), ("col28", "string", "waiver_deferral_fee", "string"), ("col29", "string", "has_term_pending_ind", "string"), ("col30", "string", "use_of_service", "string")], transformation_ctx = "applymapping1")
    #applymapping1.printSchema()
    #applymapping1.show(1)
    
    ## @type: ResolveChoice
    ## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
    ## @return: resolvechoice2
    ## @inputs: [frame = applymapping1]
    resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
    ## @type: DropNullFields
    ## @args: [transformation_ctx = "dropnullfields3"]
    ## @return: dropnullfields3
    ## @inputs: [frame = resolvechoice2]
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2)
    dropnullfields3.printSchema()
    dropnullfields3.show(1)
    ## @type: DataSink
    ## @args: [catalog_connection = "uls_stage_conn", connection_options = {"dbtable": "ad_dat_d34ee0e6b01d07f287826be0cf826b01", "database": "uls"}, transformation_ctx = "datasink4"]
    ## @return: datasink4
    ## @inputs: [frame = dropnullfields3]
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "uls_stage_conn", connection_options = {"dbtable": "PUBACC_AD", "database": "uls"}, transformation_ctx = "datasink4")
    job.commit()

The DDL for the table is given below.

CREATE TABLE `PUBACC_AD` (
  `record_type` char(2) DEFAULT NULL,
  `unique_system_identifier` decimal(9,0) NOT NULL,
  `uls_file_number` char(14) DEFAULT NULL,
  `ebf_number` varchar(30) DEFAULT NULL,
  `application_purpose` char(2) DEFAULT NULL,
  `application_status` char(1) DEFAULT NULL,
  `application_fee_exempt` char(1) DEFAULT NULL,
  `regulatory_fee_exempt` char(1) DEFAULT NULL,
  `source` char(1) DEFAULT NULL,
  `requested_expiration_date_mmdd` char(4) DEFAULT NULL,
  `receipt_date` char(10) DEFAULT NULL,
  `notification_code` char(1) DEFAULT NULL,
  `notification_date` char(10) DEFAULT NULL,
  `expanding_area_or_contour` char(1) DEFAULT NULL,
  `change_type` char(1) DEFAULT NULL,
  `original_application_purpose` char(2) DEFAULT NULL,
  `requesting_a_waiver` char(1) DEFAULT NULL,
  `how_many_waivers_requested` int DEFAULT NULL,
  `any_attachments` char(1) DEFAULT NULL,
  `number_of_requested_SIDs` int DEFAULT NULL,
  `fee_control_num` char(16) DEFAULT NULL,
  `date_entered` char(10) DEFAULT NULL,
  `reason` varchar(255) DEFAULT NULL,
  `frequency_coordination_indicator` char(1) DEFAULT NULL,
  `emergency_sta` char(1) DEFAULT NULL,
  `overall_change_type` char(1) DEFAULT NULL,
  `slow_growth_ind` char(1) DEFAULT NULL,
  `previous_waiver` char(1) DEFAULT NULL,
  `waiver_deferral_fee` char(1) DEFAULT NULL,
  `has_term_pending_ind` char(1) DEFAULT NULL,
  `use_of_service` char(1) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1

If I run this job it throws error given below

An error occurred while calling o157.pyWriteDynamicFrame. Column 'unique_system_identifier' cannot be null.

I coudn't find null values though. I am also attaching the data file I am trying. Please suggest a solution.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source