'Tweet Streaming Stalled in Apache Airflow
I got an error when I was running twitter streaming task in Apache Airflow 2. This was the error:
[2022-02-22, 14:33:38 UTC] {TwitterError.py:22} WARNING - <class 'str'> Twitter stream stalled
[2022-02-22, 14:33:38 UTC] {logging_mixin.py:109} INFO - TwitterConnectionError Twitter stream stalled
[2022-02-22, 14:33:38 UTC] {python.py:152} INFO - Done. Returned value was: None
[2022-02-22, 14:33:38 UTC] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=tweet_crawl_dag, task_id=start_crawl_task, execution_date=20220220T031806, start_date=20220220T031808, end_date=20220222T143338
[2022-02-22, 14:33:39 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-02-22, 14:33:39 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941: DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead.
warnings.warn(message, DeprecationWarning)
[2022-02-22, 14:33:39 UTC] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2022-02-22, 14:33:39 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'sudo docker restart tweetpyrabbit_worker_1']
[2022-02-22, 14:33:39 UTC] {subprocess.py:85} INFO - Output:
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO -
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - We trust you have received the usual lecture from the local System
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - Administrator. It usually boils down to these three things:
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO -
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - #1) Respect the privacy of others.
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - #2) Think before you type.
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - #3) With great power comes great responsibility.
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO -
[2022-02-22, 14:33:41 UTC] {subprocess.py:89} INFO - sudo: no tty present and no askpass program specified
[2022-02-22, 14:33:41 UTC] {subprocess.py:93} INFO - Command exited with return code 1
[2022-02-22, 14:33:41 UTC] {taskinstance.py:1588} ERROR - Error when executing on_success_callback
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1586, in _run_finished_callback
task.on_success_callback(context)
File "/opt/airflow/dags/tweet_konflik_dag.py", line 58, in clear_upstream_crawl_task
return clear_tasks.execute(context=context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/bash.py", line 187, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.
[2022-02-22, 14:33:41 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
This is my dag. I use on_success_callback because the task was marked SUCCESS after the tweet streaming task was stalled. I use command clear inside the callback function to restart the task. I tried to restart the airflow worker automatically because my dag can run again If I restarted the airflow worker manually. However, I can't restart apache worker automatically using on_success_callback function.
def clear_upstream_crawl_task(context):
execution_date = context.get("execution_date")
task_instance = get_task_instance.get_task_instance(dag_name_crawl,'start_crawl_task',execution_date)
task_start_date = task_instance.start_date
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {task_start_date} -t {task_to_be_restart} -d -y {dag_name_crawl} && sudo docker restart tweetpyrabbit_worker_1'
)
return clear_tasks.execute(context=context)
with DAG(dag_name_crawl,default_args=default_args) as dag_crawl:
t1 = DummyOperator(task_id="start")
t2 = PythonOperator(task_id='start_crawl_task',
python_callable=start_crawl,
on_success_callback=clear_upstream_crawl_task,
provide_context=True,
retries= 5,
retry_delay=timedelta(seconds=90),
op_kwargs={'env_file': env_file})
t3 = DummyOperator(task_id="end")
t1 >> t2 >> t3
Twitter always stop streaming task every day. How to overcome this problem? Thank you
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
