'Not able to run airflow task using airflow plugin
I've got gcloud composer airflow instance and custom airflow web plugin which runs separate job on specific endpoint. Airflow version is 1.14 (1.14+composer in the cloud)
@rest_api_decorator('/run_task', "POST")
def trigger_tasks(self):
dag_id = Util.get_argument(request, 'dag_id')
task_id = Util.get_argument(request, 'task_id')
subdir = Util.get_argument(request, 'subdir')
execution_date = parsedate(Util.get_argument(request, 'execution_date'))
dag = self.get_dag(subdir, dag_id)
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, execution_date)
ti.refresh_from_db()
ti_list.append(ti)
executor = get_default_executor()
executor.start()
executor.queue_task_instance(
ti,
ignore_all_deps=True)
executor.heartbeat()
executor.end()
result = {
'dag_id': dag_id,
'task_id': task_id_list
}
return Util.get_response(result)
Listing above is an example of the endpoint of web plugin. This code works flowlessly on my local instance with celery executor but for some reason it doesn't work in the cloud. From log files I clearly can see that the task is being added to the queue:
2021-04-28T16:17:04.962251133Zairflow-webserver [2021-04-28 16:17:01,054] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'tutorial', 'print_date', '2021-04-23T09:18:11+00:00', '-A', '--local', '--pool', 'default_pool', '-sd', 'DAGS_FOLDER/
But then there are multiple errors indicating that operation above didn't complete successfully:
[2021-04-28 16:23:27,934] {redis.py:363} ERROR - Connection to Redis lost: Retry (0/20) now.@-@{"workflow": "tutorial", "task-id": "print_date", "execution-date": "2021-04-23T09:18:13+00:00"}
I was able to run separate task using gcloud cli but for some reason similar code doesn't execute in the plugin. Am I missing something?
Update
Here is screenshot of worker workload [1]: https://i.stack.imgur.com/GHUv6.png
Solution 1:[1]
Figured out with Google Cloud support that it's not possible to queue task in composer environment in such a way. As a workaround, simple "wrapper" dag can be created, that will contain bash operator to submit separate tasks using cli.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | regexpguy |
