'Google Cloud Dataflow Python, Retrieving Job ID

I am currently working on a Dataflow Template in Python, and I would like to access the Job ID and use it to save to a specific Firestore Document.

Is it possible to access the Job ID?

I cannot find anything regarding this in the documentation.



Solution 1:[1]

You can use the Google Dataflow API. Use the projects.jobs.list method to retrieve Dataflow Job IDs.

Solution 2:[2]

From skimming over the documentation, the response you should get from launching the job should contain a json body with a property "job" that is an instance of Job.

You should be able to use this to get the Id you need.

If you are using the google cloud sdk for dataflow, you might get a different object when you call the create method on templates().

Solution 3:[3]

The following snippet launches a Dataflow template stored in a GCS bucket, gets the job id from the response body of the launch template API, and finally polls for the final job state of the Dataflow Job every 10 seconds, for example.

The official documentation by Google Cloud for the response body is here.

So far I have only seen six job states of a Dataflow Job, please let me know if I have missed the others.

def launch_dataflow_template(project_id, location, credentials, template_path):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    logger.info(f"Template path: {template_path}")
    result = dataflow.projects().locations().templates().launch(
            projectId=project_id,
            location=location,
            body={
                ...
            },
            gcsPath=template_path  # dataflow template path
    ).execute()
    return result.get('job', {}).get('id')

def poll_dataflow_job_status(project_id, location, credentials, job_id):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    # executing states are not the final states of a Dataflow job, they show that the Job is transitioning into another upcoming state
    executing_states = ['JOB_STATE_PENDING', 'JOB_STATE_RUNNING', 'JOB_STATE_CANCELLING']
    # final states do not change further
    final_states = ['JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED']
    while True:
        job_desc =_get_dataflow_job_status(dataflow, project_id, location, job_id)
        if job_desc['currentState'] in executing_states:
            pass
        elif job_desc['currentState'] in final_states:
            break
        sleep(10)
    return job_id, job_desc['currentState']

Solution 4:[4]

You can get gcp metadata from using these beam functions in 2.35.0. You can visit documentation https://beam.apache.org/releases/pydoc/2.35.0/_modules/apache_beam/io/gcp/gce_metadata_util.html#fetch_dataflow_job_id

beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_name")
beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_id")

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Yurci
Solution 2 Carlo Field
Solution 3 gamberooni
Solution 4 ARNOLD BRANDO HUETE GARCIA