'MWAA not finding aws_default connection
I just set up AWS MWAA (managed airflow) and I'm playing around with running a simple bash script in a dag. I was reading the logs for the task and noticed that by default, the task looks for the aws_default connection and tries to use it but doesn't find it.
I went to the connections pane and set the aws_default connection but it still is showing the same message in the logs.
Airflow Connection: aws_conn_id=aws_default
No credentials retrieved from Connection
*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-
postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None
How can I get MWAA to recognize this connection?
My dag:
from datetime import datetime, timedelta, tzinfo
import pendulum
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'dms-postgres-dialog-label-pg-test',
default_args=default_args,
description='',
schedule_interval=timedelta(days=1),
start_date=start_date,
tags=['example'],
) as dag:
t1 = BashOperator(
task_id='start-replication-task',
bash_command="""
aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
""",
)
t1
Edit: For now, I'm just importing an in-built function and using that to get the credentials. Example:
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...
print(conn.host)
print(conn.login)
print(conn.password)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
