'Celery integration testing with pytest and monkeypatching

I'm trying to run some simple integration tests with Python 3.9.5, Celery 5.2.6, pytest 7.1.0 and FastAPI on docker.

Project structure:

  📦
  ┣ 📂app
  ┣ ┣ 📂api
  ┣ ┣ ┣ 📂routes
  ┣ ┣ ┣ ┗ 📜celery.py
  ┣ ┗ 📂services
  ┣    ┗  📜operations.py
  ┣ ┣ 📂celery
  ┣ ┃ ┣ 📜tasks.py
  ┣ ┃ ┣ 📜utils.py
  ┣ ┃ ┗ 📜worker.py
  ┣ 📂tests
     ┣ 📜conftest.py
     ┗ 📜test_celery.py

The celery container is run with: command: celery -A app.celery.worker:celery_app worker --loglevel INFO --logfile=./celery.log --hostname=worker_1_dev@%h, and works as expected.

And the web server container with: command: uvicorn app.api.server:app

Both have bind mounts of the complete project directory.

How should the celery fixtures be defined so that I can monkeypatch operations.py or the task itself? Right now when my tests run the actual worker container picks up the real tasks and runs them, so I guess the problem lies in the fixtures. Setting always eager does not change the behaviour, and isn't recommended either. The worker and app instance fixtures return objects along the lines of:

celery_worker: gen44@XXX
celery_app: <Celery celery.tests at XXX>

which seems correct, they're not the actual worker container or the app.celery.worker:celery_app instance. I've tried messing with other fixtures in the docs, mocking approaches, etc. but I've spent way too much time on this and tests just don't seem to work. Like I said, the actual container executes tasks as expected.

app/api/routes/celery.py

class TaskStatus(CoreModel):
    task_id: str
    task_status: Optional[str]
    task_result: Optional[Any]

@router.post(
  "/operations/add-with-factor",
  name="celery:add-times-factor",
)
def add_times_factor(
  x: int = Query(..., gt=0),
  y: int = Query(..., gt=0),
):
  task = celery_app.send_task(
    "add_times_factor",
    args=[x, y],
  )
  return {"task_id": task.id}


@router.get(
    "/tasks/{task_id}/",
    name="celery:get-task-status",
    response_model=TaskStatus,
)
def get_status(task_id):
    task_result = AsyncResult(task_id)
    return TaskStatus(
        task_id=task_id,
        task_status=task_result.status,
        task_result=task_result.result,
    )

tests/conftest.py

@pytest.fixture(scope="session")
def celery_enable_logging():
    return True

@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": "memory://",
        "result_backend": "rpc://",
        "task_always_eager": True, # to be removed
    }

@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {
        "perform_ping_check": False,
    }

@pytest.fixture(scope="session")
def celery_includes():
    return ["app.celery.tasks"]

    ...

app/celery/worker.py

import os
from celery import Celery

celery_app = Celery("app.celery.worker")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["application/json", "application/x-python-serialize"]
celery_app.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery_app.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
celery_app.autodiscover_tasks(["app.celery"])

app/celery/tasks.py

from app.services.operations import OperationService

def async_to_sync(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        return asyncio.run(func(*args, **kwargs))

    return wrapped

@shared_task(name="add_times_factor")
@async_to_sync
async def add_times_factor_task(x, y):
    operation_service = OperationService()
    result = await operation_service.add_times_factor(x, y)
    return {"result": result}

app/services/operations.py

FACTOR = 2

class OperationService:
    async def add_times_factor(self, x, y):
        await something
        return (x + y) * FACTOR

tests/test_celery.py

import pytest
import app.services.operations as operations
pytestmark = pytest.mark.asyncio

class TestCelery:
    async def test_add_times_factor(self,
        celery_worker,
        celery_app: Celery,
        monkeypatch,
    ):
        monkeypatch.setattr(operations, "FACTOR", 20) # ignored right now

        celery_app.set_current()
        res = await superuser_client.post(app.url_path_for("celery:add-times-factor"), params={"x": 1, "y": 2})

        task_succeeded = False
        while not task_succeeded:
            task_status_res = await superuser_client.get(app.url_path_for("celery:get-task-status", task_id=task.id))
            assert task_status_res.status_code == HTTP_200_OK
            task_succeeded = task_status_res.json()["task_status"] == "SUCCESS"
            await asyncio.sleep(0.5)

        assert json.loads(task_status_res.json()["task_result"])["result"] == 60 # fail 6 == 60




Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source