'Airflow DAG fails when PythonOperator with error "Negsignal.SIGKILL"

I am running Airflowv1.10.15 on Cloud Composer v1.16.16.

My DAG looks like this :

from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}


# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday to BQ ingestion")
    workday_extract.fetch_wd_load_bq(key, val)


start_load = DummyOperator(task_id='start', dag=dag)

end_load = DummyOperator(task_id='end', dag=dag)

for key, val in workday_config_large.endpoint_tbl_mapping.items():
    # Task 1: Process the unmatched records from the view
    workday_to_bq = PythonOperator(
        dag=dag,
        task_id=f'{key}',
        execution_timeout=timedelta(minutes=60),
        provide_context=True,
        python_callable=wd_to_bq,
        op_kwargs={'key': key, 'val': val}
    )
    start_load >> workday_to_bq >> end_load

The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.

I have tried a lot of options but none seem to work. Can someone help on this ?



Solution 1:[1]

I resolved the issue by increasing memory size

https://github.com/apache/airflow/issues/10435

Should check the memory size of the pod that roles as worker while running the task

Solution 2:[2]

This error occurs when the allocated resources are less than what is required. DAG execution is RAM limited. More memory can be consumed depending on the DAG’s nature. So it is always preferable to use machine types with higher memory. Since you are using Cloud Composer 1, autoscaling of the resources is not possible. It would be preferable to increase your resources.

Solution 3:[3]

I had this issue too, but took a different approach.

Have you considered how your script may use less memory / use memory better, instead of simply increasing the available memory ?

    with db_connector_warehouse.create_session() as session:
        query = session.query(offers_table)\
            .yield_per(chunk_size).enable_eagerloads(False)
        
        for df in pd.read_sql(query.statement, session.bind, chunksize=chunk_size):
            yield df

in the above example - bottom part passing chunksize to pandas will have it pull the dataframe in smaller chunks, however pandas still loads everything into memory first, and then gives you the part you requested (for read_sql, and likely other loading functions such as csv / xlsx but haven't looked into this).

So you must ensure that you don't load the entire dataset - if using SQL Alchemy's ORM you need to use the yield_per param. For normal connections, you can set the connection to stream the results

A couple useful resources if you'd rather go down the route of using less memory:

How to create a large pandas dataframe from an sql query without running out of memory?

https://pythonspeed.com/articles/pandas-sql-chunking/

and if you're not familiar with the yield flow control What does the "yield" keyword do?

Solution 4:[4]

I had this happen when I was using a ThreadPoolExecutor, which doesn't release any resources until all the futures are done. To prevent the errors, I switched to processing four elements at a time:

while True:
    chunk = itertools.islice(documents, 0, 4)
    if not chunk:
        break
    with ThreadPoolExecutor(max_workers=4) as executor:
        for each in executor.map(TextScraper(), chunk):
            pass

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user3705451
Solution 2
Solution 3 David Mendes
Solution 4 Noumenon