'Celery integration testing with pytest and monkeypatching
I'm trying to run some simple integration tests with Python 3.9.5, Celery 5.2.6, pytest 7.1.0 and FastAPI on docker.
Project structure:
📦
┣ 📂app
┣ ┣ 📂api
┣ ┣ ┣ 📂routes
┣ ┣ ┣ ┗ 📜celery.py
┣ ┗ 📂services
┣ ┗ 📜operations.py
┣ ┣ 📂celery
┣ ┃ ┣ 📜tasks.py
┣ ┃ ┣ 📜utils.py
┣ ┃ ┗ 📜worker.py
┣ 📂tests
┣ 📜conftest.py
┗ 📜test_celery.py
The celery container is run with: command: celery -A app.celery.worker:celery_app worker --loglevel INFO --logfile=./celery.log --hostname=worker_1_dev@%h, and works as expected.
And the web server container with: command: uvicorn app.api.server:app
Both have bind mounts of the complete project directory.
How should the celery fixtures be defined so that I can monkeypatch operations.py or the task itself?
Right now when my tests run the actual worker container picks up the real tasks and runs them, so I guess the problem lies
in the fixtures. Setting always eager does not change the behaviour, and isn't recommended either.
The worker and app instance fixtures return objects along the lines of:
celery_worker: gen44@XXX
celery_app: <Celery celery.tests at XXX>
which seems correct, they're not the actual worker container or the app.celery.worker:celery_app instance.
I've tried messing with other fixtures in the docs, mocking approaches, etc. but I've spent way too much time on this and tests just
don't seem to work. Like I said, the actual container executes tasks as expected.
app/api/routes/celery.py
class TaskStatus(CoreModel):
task_id: str
task_status: Optional[str]
task_result: Optional[Any]
@router.post(
"/operations/add-with-factor",
name="celery:add-times-factor",
)
def add_times_factor(
x: int = Query(..., gt=0),
y: int = Query(..., gt=0),
):
task = celery_app.send_task(
"add_times_factor",
args=[x, y],
)
return {"task_id": task.id}
@router.get(
"/tasks/{task_id}/",
name="celery:get-task-status",
response_model=TaskStatus,
)
def get_status(task_id):
task_result = AsyncResult(task_id)
return TaskStatus(
task_id=task_id,
task_status=task_result.status,
task_result=task_result.result,
)
tests/conftest.py
@pytest.fixture(scope="session")
def celery_enable_logging():
return True
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "rpc://",
"task_always_eager": True, # to be removed
}
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {
"perform_ping_check": False,
}
@pytest.fixture(scope="session")
def celery_includes():
return ["app.celery.tasks"]
...
app/celery/worker.py
import os
from celery import Celery
celery_app = Celery("app.celery.worker")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["application/json", "application/x-python-serialize"]
celery_app.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery_app.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
celery_app.autodiscover_tasks(["app.celery"])
app/celery/tasks.py
from app.services.operations import OperationService
def async_to_sync(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapped
@shared_task(name="add_times_factor")
@async_to_sync
async def add_times_factor_task(x, y):
operation_service = OperationService()
result = await operation_service.add_times_factor(x, y)
return {"result": result}
app/services/operations.py
FACTOR = 2
class OperationService:
async def add_times_factor(self, x, y):
await something
return (x + y) * FACTOR
tests/test_celery.py
import pytest
import app.services.operations as operations
pytestmark = pytest.mark.asyncio
class TestCelery:
async def test_add_times_factor(self,
celery_worker,
celery_app: Celery,
monkeypatch,
):
monkeypatch.setattr(operations, "FACTOR", 20) # ignored right now
celery_app.set_current()
res = await superuser_client.post(app.url_path_for("celery:add-times-factor"), params={"x": 1, "y": 2})
task_succeeded = False
while not task_succeeded:
task_status_res = await superuser_client.get(app.url_path_for("celery:get-task-status", task_id=task.id))
assert task_status_res.status_code == HTTP_200_OK
task_succeeded = task_status_res.json()["task_status"] == "SUCCESS"
await asyncio.sleep(0.5)
assert json.loads(task_status_res.json()["task_result"])["result"] == 60 # fail 6 == 60
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
