Why Celery Tasks trigger signals twice
Celery uses signals to trigger functions. I am currently writing tests for my Celery tasks. A central point for me is to test the function that is triggered by a successfully completed task. In my case, the function is called execute_after_task()
and stores the task ID in Redis. To tell Celery that this function should be executed after a task has been successfully completed, I use the decorator @signals.task_success.connect
. The whole construct looks like this:
@signals.task_success.connect
def execute_after_task():
# Code that stores the Task ID in Redis
A test that I defined with pytest checks whether a Celery task has executed this method after successful termination and has stored the task ID exactly once in Redis.
The code, especially the task, worked without any problems. However, I was surprised to discover that Celery executes the execute_after_task
function twice. This in turn leads to the task ID being stored twice in Redis and this not only has the negative effect of ineffective resource management, but can also lead to problems on the client side, which Redis checks for an ID.
Debugging the Celery signals - What's the issue?
In my setup, I had two imports (import a
, import b
) in my test_celery.py
which had both imported the module celery_api
, in which the @signals.task_success.connect
decorator had annotated the execute_after_task
. The following snippet may help:
#test_celery.py
import a
import b
#a.py
import celery_api
#b.py
import celery_api
To understand the whole thing in more detail, I looked at the execution of execute_after_task
with the debugger and could see that Celery stores the functions that are decorated in an internal registry. This applies not only to task_success
, but also to @signals.task_failure.connect
, for example.
In the Celery package, we can see the following line in signal.py
:
#signal.py
for receiver in self._live_receivers(sender):
The code iterates over all receivers
that have been registered. When resolving this function, you can see that execute_after_task
is listed twice as weak_reference
objects.
For a better understanding, it helps to debug the actual decorator @signals.task_success.connect
. This makes it possible to look at the call stack when running the function. This shows that the decorator is called by the two different modules within test_celery.py
during the import. Namely from module a
and b
. The reason for the double execution of the function is therefore that the @signals.task_success.connect
decorator is executed twice.
How do I prevent duplicated executions?
The solution is quite simple: To avoid this, a parameter dispatch_uid=‘path.to.function’
can be specified. The new decorator looks like this:
@signals.task_success.connect(dispatch_uid='src.celery_api.tasks')
Fortunately, I noticed this during testing, but it could have occurred in any other module. At the same time, this should also be done for the task_failure
decorator.
Helpful Resources
Finally, two resources that helped with troubleshooting:
A brief explanation from the Django project
A GitHub issue about double-triggered signals