'Glue job re-processes data even though job bookmark appears properly set

This is a common issue with Glue Job Bookmarks, but I haven't run into it until now.

Due to the size of my data, I am reading and writing dynamic frames date by date.

Per the AWS recommendations:

  • Max concurrency is 1
  • I have initialized the job with job.init(args['JOB_NAME'], args)
  • I am using transformation_ctx in all my dynamic frame operations
  • I am running job.commit() at the end of the job
  • The source data is not being modified in between job runs

My job looks like this:

args = getResolvedOptions(sys.argv, ['aws_account_id','JOB_NAME'])
account_id = args.get("aws_account_id")
lake_bucket = f"data-lake-{account_id}"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

SqlQuery0 = "select ... from main" // Removed query for readability

start_date = date(2021, 1, 1)
end_date = date.today()
for date in daterange(start_date, end_date):
    date_str = date.strftime("%Y%m%d")

    main_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="data-catalog",
        table_name="main",
        transformation_ctx="main_node1",
        push_down_predicate=f"(dt == '{date_str}')"
    )

    if len(main_node1.schema().fields) == 0:
       logging.info("No new data, skipping")
       continue

    Sql_node2 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"main":main_node1}, transformation_ctx = "Sql_node2")

    S3bucket_node3 = glueContext.getSink(
        path=f"s3://{lake_bucket}/some_table/",
        connection_type="s3",
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=["dt"],
        compression="snappy",
        enableUpdateCatalog=True,
        transformation_ctx="S3bucket_node3",
    )
    S3bucket_node3.setCatalogInfo(
        catalogDatabase="data-catalog", catalogTableName="some_table"
    )
    S3bucket_node3.setFormat("glueparquet")

    logging.info("Writing frame to s3")
    S3bucket_node3.writeFrame(Sql_node2)

job.commit()

My only though is this could be due to the sparkSqlQuery function converting to dataframes, but the job bookmark is detecting old data in main_node1 before the query is even run, so I don't think that's the problem.

This is the query function:

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source