'Airflow sla_miss_callback function not triggering
I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:
SLA misses get registered successfully in the Airflow web UI at
slamiss/list/on_failure_callbackworks successfully
However, the sla_miss_callback function itself will never get triggered.
What I've tried:
Different combinations adding
slaandsla_miss_callbackat thedefault_argslevel, the DAG level, and the task levelChecking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything
The slack message callback function works if called from any other
basic task or function
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
Solution 1:[1]
I was in similar situation once.
On investigating the scheduler log, I found the following error:
[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO - --------------> ABOUT TO CALL SLA MISS CALL BACK
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG
sla_miss_alert() takes 1 positional arguments but 5 were given
The problem is that your sla_miss_callback function is expecting only 1 argument, but actually this should be like:
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Function that alerts me that dag_id missed sla"""
# <function code here>
For reference, checkout the Airflow source code.
Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.
Solution 2:[2]
Example of using SLA missed and Execution Timeout alerts:
- At first, you'll get
SLA missedafter 2 minutes task run, - and then, after 4 minutes task will fail with
Execution Timeoutalert.
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
Also, you have log_url right in the message, so you can easily open task log in Airflow.
import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple
from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."
def send_slack_alert_sla_miss(
dag: DAG,
task_list: str,
blocking_task_list: str,
slas: List[Tuple],
blocking_tis: List[TaskInstance],
) -> None:
"""Send `SLA missed` alert to Slack"""
task_instance: TaskInstance = blocking_tis[0]
message = dedent(
f"""
:warning: Task SLA missed.
*DAG*: {dag.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Task State*: `{task_instance.state}`
*Blocking Task List*: {blocking_task_list}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(message=message)
def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
"""Send `Task Failed` notification to Slack"""
task_instance: TaskInstance = context.get("task_instance")
exception: AirflowException = context.get("exception")
status = SLACK_STATUS_TASK_FAILED
if isinstance(exception, AirflowTaskTimeout):
status = SLACK_STATUS_EXECUTION_TIMEOUT
# Prepare formatted Slack message
message = dedent(
f"""
{status}
*DAG*: {task_instance.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Execution Timeout*: {task_instance.task.execution_timeout}
_** Max time allowed for the execution of this task instance_
*Task Duration*: {timedelta(seconds=round(task_instance.duration))}
*Task State*: `{task_instance.state}`
*Exception*: {exception}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(
message=message,
context=context,
)
def send_slack_alert(
message: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Send prepared message to Slack"""
slack_webhook_token = BaseHook.get_connection("slack").password
notification = SlackWebhookOperator(
task_id="slack_notification",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=message,
username="airflow",
)
notification.execute(context)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"email": ["test@test,com"],
"email_on_failure": True,
"depends_on_past": False,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
"on_failure_callback": send_slack_alert_task_failed,
}
with DAG(
dag_id="test_sla",
schedule_interval="*/5 * * * *",
start_date=datetime(2021, 1, 11),
default_args=default_args,
sla_miss_callback=send_slack_alert_sla_miss, # Must be set here, not in default_args!
) as dag:
delay_python_task = PythonOperator(
task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
sla=timedelta(minutes=2),
python_callable=lambda: time.sleep(300),
)
Solution 3:[3]
It seems that the only way to make the sla_miss_callback work is by explicitly passing the arguments that it needs... nothing else has worked for me and these arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis' are not been sent to the callback at all.
TypeError: print_sla_miss() missing 5 required positional arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis'
Solution 4:[4]
A lot of these answers are 90% complete so I wanted to share my example using bash operators which combined what I found from all of the responses above and other resources
The most important things being how you define sla_miss_callback in the dag definition and not in the default_args, and not passing context to the sla function.
"""
A simple example showing the basics of using a custom SLA notification response.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.slack_operator import SlackAPIPostOperator
from slack import slack_attachment
from airflow.hooks.base_hook import BaseHook
import urllib
#slack alert for sla_miss
def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()
base_url = 'webserver_url_here'
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
message = (f':alert: *Airflow SLA Miss*'
f'\n\n'
f'*DAG:* {dag_id}\n'
f'*Task:* {task_id}\n'
f'*Execution Date:* {execution_date}'
f'\n\n'
f'<{dag_url}|Click here to view DAG>')
sla_miss_alert = SlackAPIPostOperator(
task_id='slack_sla_miss',
channel='airflow-alerts-test',
token=str(BaseHook.get_connection("slack").password),
text = message
)
return sla_miss_alert.execute()
#slack alert for successful task completion
def slack_success_task(context):
success_alert = SlackAPIPostOperator(
task_id='slack_success',
channel='airflow-alerts-test',
token=str(BaseHook.get_connection("slack").password),
text = "Test successful"
)
return success_alert.execute(context=context)
default_args = {
"depends_on_past": False,
'start_date': datetime(2020, 11, 18),
"retries": 0
}
# Create a basic DAG with our args
# Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.
dag = DAG(
dag_id='sla_slack_v6',
default_args=default_args,
sla_miss_callback=slack_sla_miss,
catchup=False,
# A common interval to make the job fire when we run it
schedule_interval=timedelta(minutes=3)
)
# Add a task that will always fail the SLA
t1 = BashOperator(
task_id='timeout_test_sla_miss',
# Sleep 60 seconds to guarantee we miss the SLA
bash_command='sleep 60',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 10 second SLA
sla=timedelta(seconds=10),
dag=dag
)
t2 = BashOperator(
task_id='timeout_test_sla_miss_task_2',
# Sleep 30 seconds to guarantee we miss the SLA of 20 seconds set in this task
bash_command='sleep 30',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 20 second SLA
sla=timedelta(seconds=20),
dag=dag
)
t3 = BashOperator(
task_id='timeout_test_sla_miss_task_3',
# Sleep 60 seconds to guarantee we miss the SLA
bash_command='sleep 60',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 30 second SLA
sla=timedelta(seconds=30),
dag=dag
)
t1 >> t2 >> t3
Solution 5:[5]
I think the airflow documentation is a bit fuzzy on this.
Instead of the method signature as
def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis)
Modify your signature like this
def slack_sla_miss(*args, **kwargs)
This way all the parameters get passed. You will not get the errors which you are seeing in the logs.
Learnt this on url - https://www.cloudwalker.io/2020/12/15/airflow-sla-management/
Solution 6:[6]
I had the same issue, but was able to get it working with this code:
import logging as log
import airflow
import time
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow import configuration
import urllib
from airflow.operators.slack_operator import SlackAPIPostOperator
def sleep():
""" Sleep for 2 minutes """
time.sleep(60*2)
log.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
def slack_on_sla_miss(dag,
task_list,
blocking_task_list,
slas,
blocking_tis):
log.info('Running slack_on_sla_miss')
slack_conn_id = 'slack_default'
slack_channel = '#general'
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()
base_url = configuration.get('webserver', 'BASE_URL')
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
message = (f':o: *Airflow SLA Miss*'
f'\n\n'
f'*DAG:* {dag_id}\n'
f'*Task:* {task_id}\n'
f'*Execution Date:* {execution_date}'
f'\n\n'
f'<{dag_url}|Click here to view>')
slack_op = SlackAPIPostOperator(task_id='slack_failed',
slack_conn_id=slack_conn_id,
channel=slack_channel,
text=message)
slack_op.execute()
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0, minute=1),
"retries": 0,
'priority_weight': 1,
}
dag = DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=slack_on_sla_miss,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
)
with dag:
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
sla=timedelta(minutes=1),
)
sleep >> simple_task
Solution 7:[7]
I've run into this issue myself. Unlike the on_failure_callback that is looking for a python callable function, it appears that sla_miss_callback needs the full function call.
An example that is working for me:
def sla_miss_alert(dag_id):
"""
Function that alerts me that dag_id missed sla
"""
<function code here>
def task_failure_alert(dag_id, context):
"""
Function that alerts me that a task failed
"""
<function code here>
dag_id = 'sla_test'
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': partial(task_failure_alert, dag_id),
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=sla_miss_alert(dag_id),
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
As far as I can tell, sla_miss_callback doesn't have access to context, which is unfortunate. Once I stopped looking for the context, I finally got my alerts.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Paul P |
| Solution 2 | Mike Milligan |
| Solution 3 | Cesar Flores |
| Solution 4 | phenderbender |
| Solution 5 | floating_hammer |
| Solution 6 | user3505886 |
| Solution 7 | MathematicalOwl |
