'AWS Glue job not loading data from s3 to amazon Aurora db-giving null error
My aim is to populate data from a file in s3 -s3://dev-itr-uls-weekly-files/2022-01-11/a_aircr/AD.dat into a table in AWS Aurora.
The script written in AWS Glue job is given below.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "fulldatas-01-11-db", table_name =
"ad_dat_d34ee0e6b01d07f287826be0cf826b01", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
#datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "fulldatas-01-11-db", table_name = "ad_dat_d34ee0e6b01d07f287826be0cf826b01", transformation_ctx = "datasource0")
connection_type = 's3'
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type,
connection_options={"paths": ["s3://dev-itr-uls-weekly-files/2022-01-11/a_aircr/AD.dat"]}, format="csv",
format_options={"quoteChar": -1, "separator": "|"}, transformation_ctx = "datasource0")
#datasource0.printSchema()
#datasource0.show(1)
## @type: ApplyMapping
## @args: [mapping = [("col0", "string", "col0", "string"), ("col1", "long", "col1", "long"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string"), ("col5", "string", "col5", "string"), ("col6", "string", "col6", "string"), ("col7", "string", "col7", "string"), ("col8", "string", "col8", "string"), ("col9", "long", "col9", "long"), ("col10", "string", "col10", "string"), ("col11", "string", "col11", "string"), ("col12", "string", "col12", "string"), ("col13", "string", "col13", "string"), ("col14", "string", "col14", "string"), ("col15", "string", "col15", "string"), ("col16", "string", "col16", "string"), ("col17", "string", "col17", "string"), ("col18", "string", "col18", "string"), ("col19", "string", "col19", "string"), ("col20", "string", "col20", "string"), ("col21", "string", "col21", "string"), ("col22", "string", "col22", "string"), ("col23", "string", "col23", "string"), ("col24", "string", "col24", "string"), ("col25", "string", "col25", "string"), ("col26", "string", "col26", "string"), ("col27", "string", "col27", "string"), ("col28", "string", "col28", "string"), ("col29", "string", "col29", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "record_type", "string"), ("col1", "long", "unique_system_identifier", "long"), ("col2", "string", "uls_file_number", "string"), ("col3", "string", "ebf_number", "string"), ("col4", "string", "application_purpose", "string"), ("col5", "string", "application_status", "string"), ("col6", "string", "application_fee_exempt", "string"), ("col7", "string", "regulatory_fee_exempt", "string"), ("col8", "string", "source", "string"), ("col9", "long", "requested_expiration_date_mmdd", "long"), ("col10", "string", "receipt_date", "string"), ("col11", "string", "notification_code", "string"), ("col12", "string", "notification_date", "string"), ("col13", "string", "expanding_area_or_contour", "string"), ("col14", "string", "change_type", "string"), ("col15", "string", "original_application_purpose", "string"), ("col16", "string", "requesting_a_waiver", "string"), ("col17", "string", "how_many_waivers_requested", "string"), ("col18", "string", "any_attachments", "string"), ("col19", "string", "number_of_requested_SIDs", "string"), ("col20", "string", "fee_control_num", "string"), ("col21", "string", "date_entered", "string"), ("col22", "string", "reason", "string"), ("col23", "string", "frequency_coordination_indicator", "string"), ("col24", "string", "emergency_sta", "string"), ("col25", "string", "overall_change_type", "string"), ("col26", "string", "slow_growth_ind", "string"), ("col27", "string", "previous_waiver", "string"), ("col28", "string", "waiver_deferral_fee", "string"), ("col29", "string", "has_term_pending_ind", "string"), ("col30", "string", "use_of_service", "string")], transformation_ctx = "applymapping1")
#applymapping1.printSchema()
#applymapping1.show(1)
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2)
dropnullfields3.printSchema()
dropnullfields3.show(1)
## @type: DataSink
## @args: [catalog_connection = "uls_stage_conn", connection_options = {"dbtable": "ad_dat_d34ee0e6b01d07f287826be0cf826b01", "database": "uls"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "uls_stage_conn", connection_options = {"dbtable": "PUBACC_AD", "database": "uls"}, transformation_ctx = "datasink4")
job.commit()
The DDL for the table is given below.
CREATE TABLE `PUBACC_AD` (
`record_type` char(2) DEFAULT NULL,
`unique_system_identifier` decimal(9,0) NOT NULL,
`uls_file_number` char(14) DEFAULT NULL,
`ebf_number` varchar(30) DEFAULT NULL,
`application_purpose` char(2) DEFAULT NULL,
`application_status` char(1) DEFAULT NULL,
`application_fee_exempt` char(1) DEFAULT NULL,
`regulatory_fee_exempt` char(1) DEFAULT NULL,
`source` char(1) DEFAULT NULL,
`requested_expiration_date_mmdd` char(4) DEFAULT NULL,
`receipt_date` char(10) DEFAULT NULL,
`notification_code` char(1) DEFAULT NULL,
`notification_date` char(10) DEFAULT NULL,
`expanding_area_or_contour` char(1) DEFAULT NULL,
`change_type` char(1) DEFAULT NULL,
`original_application_purpose` char(2) DEFAULT NULL,
`requesting_a_waiver` char(1) DEFAULT NULL,
`how_many_waivers_requested` int DEFAULT NULL,
`any_attachments` char(1) DEFAULT NULL,
`number_of_requested_SIDs` int DEFAULT NULL,
`fee_control_num` char(16) DEFAULT NULL,
`date_entered` char(10) DEFAULT NULL,
`reason` varchar(255) DEFAULT NULL,
`frequency_coordination_indicator` char(1) DEFAULT NULL,
`emergency_sta` char(1) DEFAULT NULL,
`overall_change_type` char(1) DEFAULT NULL,
`slow_growth_ind` char(1) DEFAULT NULL,
`previous_waiver` char(1) DEFAULT NULL,
`waiver_deferral_fee` char(1) DEFAULT NULL,
`has_term_pending_ind` char(1) DEFAULT NULL,
`use_of_service` char(1) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
If I run this job it throws error given below
An error occurred while calling o157.pyWriteDynamicFrame. Column 'unique_system_identifier' cannot be null.
I coudn't find null values though. I am also attaching the data file I am trying. Please suggest a solution.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
