'AWS GlueStudio RDS -> Redshift invalid timestamp format

I am trying to create an AWS Glue ETL job to move data from Aurora RDS to Redshift, but cannot resolve how to get the timestamp fields properly mapped. All stages of the job show a valid preview of the expected data, but the job always fails with the following error

    py4j.protocol.Py4JJavaError: An error occurred while calling o179.pyWriteDynamicFrame.
    : java.sql.SQLException:
    Error (code 1206) while loading data into Redshift: "Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS]"
    Table name: public.stage_table_ae89e9dffe974b649bbf4852e49a4b12
    Column name: updated_at
    Column type: timestamp(0)
    Raw line: 1234,5341,1121,0,2022-01-06 16:29:55.000000000,2022-01-06 16:29:55.000000000,1,1,Suzy
    Raw field value: 0

I have tried doing a date format to remove the microseconds, I have tried forcing quotes around the date fields, nothing works.

Here is the generated script


    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrameCollection
    from awsglue.dynamicframe import DynamicFrame
    from awsglue import DynamicFrame
    
    # Script generated for node Custom transform
    def CastIntsTransform(glueContext, dfc) -> DynamicFrameCollection:
        df = dfc.select(list(dfc.keys())[0])
        df_resolved = (
            df.resolveChoice(specs=[("id", "cast:bigint")])
            .resolveChoice(specs=[("user_id", "cast:bigint")])
            .resolveChoice(specs=[("connected_user_id", "cast:bigint")])
            .resolveChoice(specs=[("mg_id", "cast:bigint")])
            .resolveChoice(specs=[("access_level", "cast:tinyint")])
            .resolveChoice(specs=[("status", "cast:tinyint")])
        )
        return DynamicFrameCollection({"CustomTransform0": df_resolved}, glueContext)
    
    
    def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
        for alias, frame in mapping.items():
            frame.toDF().createOrReplaceTempView(alias)
        result = spark.sql(query)
        return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
    
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # Script generated for node JDBC Connection
    JDBCConnection_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="ABC123",
        table_name="user_connections",
        transformation_ctx="JDBCConnection_node1",
    )
    
    # Script generated for node SQL
    SqlQuery0 = """
    select 
        id, 
        user_id, 
        connected_user_id, 
        COALESCE(mg_id, 0) mg_id, 
        created_at,
        updated_at,
        updated_at,
        access_level, 
        status, 
        COALESCE(nickname, '') nickname 
    from 
        apiData
    """
    SQL_node1647619002820 = sparkSqlQuery(
        glueContext,
        query=SqlQuery0,
        mapping={"apiData": JDBCConnection_node1},
        transformation_ctx="SQL_node1647619002820",
    )
    
    # Script generated for node Custom transform
    Customtransform_node1647612655336 = CastIntsTransform(
        glueContext,
        DynamicFrameCollection(
            {"SQL_node1647619002820": SQL_node1647619002820}, glueContext
        ),
    )
    
    # Script generated for node Select From Collection
    SelectFromCollection_node1647613332516 = SelectFromCollection.apply(
        dfc=Customtransform_node1647612655336,
        key=list(Customtransform_node1647612655336.keys())[0],
        transformation_ctx="SelectFromCollection_node1647613332516",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=SelectFromCollection_node1647613332516,
        mappings=[
            ("id", "bigint", "id", "bigint"),
            ("user_id", "bigint", "user_id", "bigint"),
            ("connected_user_id", "bigint", "connected_user_id", "bigint"),
            ("mg_id", "bigint", "mg_id", "bigint"),
            ("created_at", "timestamp", "created_at", "timestamp"),
            ("updated_at", "timestamp", "updated_at", "timestamp"),
            ("access_level", "tinyint", "access_level", "tinyint"),
            ("status", "tinyint", "status", "tinyint"),
            ("nickname", "varchar", "nickname", "varchar"),
        ],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Script generated for node Amazon Redshift
    pre_query = "drop table if exists public.stage_table_cd5d65739d334453938f090ea1cb2d6e;create table public.stage_table_cd5d65739d334453938f090ea1cb2d6e as select * from public.test_user_connections where 1=2;"
    post_query = "begin;delete from public.test_user_connections using public.stage_table_cd5d65739d334453938f090ea1cb2d6e where public.stage_table_cd5d65739d334453938f090ea1cb2d6e.id = public.test_user_connections.id; insert into public.test_user_connections select * from public.stage_table_cd5d65739d334453938f090ea1cb2d6e; drop table public.stage_table_cd5d65739d334453938f090ea1cb2d6e; end;"
    AmazonRedshift_node1647612972417 = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=ApplyMapping_node2,
        catalog_connection="ABC123",
        connection_options={
            "database": "test",
            "dbtable": "public.stage_table_cd5d65739d334453938f090ea1cb2d6e",
            "preactions": pre_query,
            "postactions": post_query,
        },
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="AmazonRedshift_node1647612972417",
    )
    
    job.commit()



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source