'Glue job re-processes data even though job bookmark appears properly set
This is a common issue with Glue Job Bookmarks, but I haven't run into it until now.
Due to the size of my data, I am reading and writing dynamic frames date by date.
Per the AWS recommendations:
- Max concurrency is 1
- I have initialized the job with
job.init(args['JOB_NAME'], args) - I am using
transformation_ctxin all my dynamic frame operations - I am running
job.commit()at the end of the job - The source data is not being modified in between job runs
My job looks like this:
args = getResolvedOptions(sys.argv, ['aws_account_id','JOB_NAME'])
account_id = args.get("aws_account_id")
lake_bucket = f"data-lake-{account_id}"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
SqlQuery0 = "select ... from main" // Removed query for readability
start_date = date(2021, 1, 1)
end_date = date.today()
for date in daterange(start_date, end_date):
date_str = date.strftime("%Y%m%d")
main_node1 = glueContext.create_dynamic_frame.from_catalog(
database="data-catalog",
table_name="main",
transformation_ctx="main_node1",
push_down_predicate=f"(dt == '{date_str}')"
)
if len(main_node1.schema().fields) == 0:
logging.info("No new data, skipping")
continue
Sql_node2 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"main":main_node1}, transformation_ctx = "Sql_node2")
S3bucket_node3 = glueContext.getSink(
path=f"s3://{lake_bucket}/some_table/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["dt"],
compression="snappy",
enableUpdateCatalog=True,
transformation_ctx="S3bucket_node3",
)
S3bucket_node3.setCatalogInfo(
catalogDatabase="data-catalog", catalogTableName="some_table"
)
S3bucket_node3.setFormat("glueparquet")
logging.info("Writing frame to s3")
S3bucket_node3.writeFrame(Sql_node2)
job.commit()
My only though is this could be due to the sparkSqlQuery function converting to dataframes, but the job bookmark is detecting old data in main_node1 before the query is even run, so I don't think that's the problem.
This is the query function:
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
