Wieso Celery Tasks Signale doppelt triggern
Celery nutzt Signale, um Funktionen zu triggern. Aktuell schreibe ich Tests für meine Celery Tasks. Dabei ist für mich ein wesentlicher Punkt, die Funktion zu testen, welche von einem erfolgreich beendetem Task getriggert wird. In meinem Fall heißt die Funktion execute_after_task()
und speichert die Task ID in Redis. Um Celery mitzuteilen, dass diese Funktion nach erfolgreichem Beenden eines Tasks ausgeführt werden soll, nutze ich den decorator @signals.task_success.connect
. Das ganze Konstrukt sieht folgendermaßen aus:
@signals.task_success.connect
def execute_after_task():
# Code der die Task ID in Redis speichert
Ein Test, den ich mit pytest definiert habe, sieht vor, dass geprüft wird, ob ein Celery Task nach erfolgreichem Beenden diese Methode ausgeführt hat und die Task ID genau einmal in Redis abgelegt hat.
Der Code, insbesondere der Task, hat soweit ohne Probleme funktioniert. Verwundert musste ich jedoch feststellen, dass Celery die Funktion execute_after_task
doppelt ausführt. Das führt wiederum dazu, dass Redis doppelt mit der Task ID bespeichert wird und das hat nicht nur den negativen Effekt von ineffektivem Ressourcenmanagement, sondern kann auch zu Problemen auf der Client-Seite führen, welche Redis nach einer ID prüft.
Debugging der Celery Signale - Wo ist der Fehler?
In meinem Aufbau hatte ich zwei Imports (import a
, import b
) in meiner test_celery.py
die wiederum beide das Modul celery_api
importiert hatten, in welcher der @signals.task_success.connect
decorator die execute_after_task
annotiert hatte. Eventuell hilft die folgende Darstellung:
#test_celery.py
import a
import b
#a.py
import celery_api
#b.py
import celery_api
Um das Ganze genauer zu verstehen, habe ich die Ausführung der execute_after_task
mit dem Debugger betrachtet und konnte sehen, dass Celery die mit decorators versehenen Funktionen in eine interne Registry ablegt. Das trifft nicht nur auf task_success
zu, sondern beispielsweise auch auf @signals.task_failure.connect
.
In Celery können wir in der signal.py
folgende Zeile beobachten:
#signal.py
for receiver in self._live_receivers(sender):
Der Code iteriert über alle receivers
die registriert wurden. Beim Auflösen dieser Funktion sieht man, dass execute_after_task
doppelt als weak_reference
Objekte aufgelistet wird.
Für ein besseres Verständnis hilft es den eigentlichen decorator @signals.task_success.connect
zu debuggen. Dadurch ist es möglich, beim durchlaufen der Funktion, den Call Stack zu betrachten. Dieser zeigt, dass der decorator während des Imports von den zwei verschiedenen Modulen innerhalb der test_celery.py
aufgerufen wird. Nämlich von Modul a
und b
. Die Ursache für die doppelte Ausführung der Funktion ist also, dass der @signals.task_success.connect
decorator zwei mal ausgeführt wird.
Wie verhindere ich den doppelten Aufruf?
Die Lösung ist recht einfach: Um das zu vermeiden, kann ein Parameter dispatch_uid='path.to.function'
mitgegeben werden. Der neue decorator sieht wie folgt aus:
@signals.task_success.connect(dispatch_uid='src.celery_api.tasks')
Dieser Umstand ist mir glücklicherweise während dem Testen aufgefallen, jedoch hätte es auch in jedem anderen Modul passieren können. Simultan sollte das auch für den task_failure
decorator vorgenommen werden.
Hilfreiche Quellen
Zum Abschluss noch zwei Ressourcen, welche bei der Fehlersuche geholfen haben: