'Unable to get epoch in milliseconds in Airflow
I have the below airflow code:
time_for_run_id = time.time()
run_id=int(time_for_run_id * 1000)
SQL_1 = "INSERT INTO dds_control.dds_control_table (job_run_id,job_start,job_status) VALUES('{}',current_timestamp,'In-progress');".format(run_id)
SQL_2 = "UPDATE dds_control.dds_control_table SET job_end = current_timestamp, job_status = 'Completed' WHERE job_run_id='{}';".format(run_id)
with models.DAG(
dag_id='DDS_HK_CustAttrRefresh_ID_ibis_bis_acct_mh',
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['ingestion_dag'],
) as dag:
prev_task = None
# Insert the start time and config of the job into DDS metadata control table
job_start_time = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=connection_names,
task_id='insert_dds_ct_start_ts',
sql=SQL_1,
)
# Insert the resulting status of the job {success/failed} to DDS metadata control table
job_end_time = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=connection_names,
task_id='update_dds_ct_end_ts',
sql=SQL_2,
trigger_rule='all_done',
)
job_start_time >> job_end_time
I see that the run_id which is getting passed to the SQL query is different for both the operators. I tried using {{ execution_date.int_timestamp }} , but this does not give me the values in milliseconds or nanoseconds. Is there another way for this?
Solution 1:[1]
execution_date passed in the taskinstance context is a pendulum.Datetime instance.
You can write for your SQL statements:
SQL_1 = """
INSERT INTO dds_control.dds_control_table (job_run_id, job_start, job_status)
VALUES(
'{{ execution_date.format('x') }}',
current_timestamp,
'In-progress'
);
"""
SQL_2 = """
UPDATE dds_control.dds_control_table
SET
job_end = current_timestamp,
job_status = 'Completed'
WHERE job_run_id='{{ execution_date.format('x') }}';
"""
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Oluwafemi Sule |
