'Migrating On Premise Apache Airflow workflow scripts to Cloud Composer

I have an on-premise environment running Airflow v2.2.0 and wish to migrate all the workflows in this instance to a Cloud Composer instance. While doing this migration, some of the operators used on the on-premise environment do not work after running the same workflow on Cloud Composer 1 (Airflow 2.1.4).

Below is a single task from such a workflow:

hive_task = HiveOperator(
    hql="./scripts/hive-script.sql",
    task_id="survey_data_aggregator",
    hive_cli_conn_id="hive_server_conn",
    dag=data_aggregator,
    hiveconfs={"input_path": "{{ params['input_path']  }}", "output_path": "{{ params['output_path']  }}"}
)

Execution of the workflow results in the following error [Errno 2] No such file or directory: 'beeline'

Having faced this error before, I know it is due to the worker nodes not having the beeline binary in its PATH. However, I do not want to SSH into every worker instance and update its PATH variable.

Upon further research, it was found that when I replace HiveOperator with DataProcHiveOperator, and update the arguments accordingly, the workflow works as expected. This too is an unacceptable solution as manually editing each workflow script is not a practical workaround. Additionally, there might be more operators which require manual intervention similar to this which would further increase the manual effort needed by the migration.

What is the optimal way of handling situations such as these without having to amend workflow scripts manually? Furthermore, is there an official documentation to a Google recommended way to migrate Apache Airflows from on-premise instances to Cloud Composer as I am unable to find a reference to it.

EDIT 1:

I found out that if a task requires 3rd party binaries, the KubernetesPodOperator could be used to spin up a pod containing a docker image which contains all the required binaries and dependencies.

kubernetes_min_pod = KubernetesPodOperator(
    task_id='pod-ex-minimum',
    name='pod-ex-minimum',
    cmds=['echo', '"hello world"'],
    namespace='default',
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4'
)

However this pod is killed as soon as after the command specified in the cmds is executed. Is there a way to specify that all succeeding tasks be run inside this pod?

EDIT 2:

There's been some confusion about my question. Please let me clarify: While I understand that if my DAG needs a binary to work, I can download that binary on the airflow worker pod and update the PATH variable. I am keeping this option as a last resort for now.

I am basically looking for a way of spinning up a Google Composer instance with my environment having all prerequisite binaries. Ideally, this would work if I could specify a custom image for the airflow worker pod, where I can preinstall all my required dependencies, but I have been unable to find a way to do this.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source