'MWAA not finding aws_default connection

I just set up AWS MWAA (managed airflow) and I'm playing around with running a simple bash script in a dag. I was reading the logs for the task and noticed that by default, the task looks for the aws_default connection and tries to use it but doesn't find it.

I went to the connections pane and set the aws_default connection but it still is showing the same message in the logs.

Airflow Connection: aws_conn_id=aws_default

No credentials retrieved from Connection

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-

postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

How can I get MWAA to recognize this connection?

My dag:

from datetime import datetime, timedelta, tzinfo
import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'dms-postgres-dialog-label-pg-test',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1),
    start_date=start_date,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='start-replication-task',
        bash_command="""
        aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
        """,
    )

    t1

Edit: For now, I'm just importing an in-built function and using that to get the credentials. Example:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source