3 min read

Wieso Celery Tasks Signale doppelt triggern

Celery nutzt Signale, um Funktionen zu triggern. Aktuell schreibe ich Tests für meine Celery Tasks. Dabei ist für mich ein wesentlicher Punkt, die Funktion zu testen, welche von einem erfolgreich beendetem Task getriggert wird. In meinem Fall heißt die Funktion execute_after_task() und speichert die Task ID in Redis. Um Celery mitzuteilen, dass diese Funktion nach erfolgreichem Beenden eines Tasks ausgeführt werden soll, nutze ich den decorator @signals.task_success.connect . Das ganze Konstrukt sieht folgendermaßen aus:


  @signals.task_success.connect 
  def execute_after_task():
    # Code der die Task ID in Redis speichert

Ein Test, den ich mit pytest definiert habe, sieht vor, dass geprüft wird, ob ein Celery Task nach erfolgreichem Beenden diese Methode ausgeführt hat und die Task ID genau einmal in Redis abgelegt hat.

Der Code, insbesondere der Task, hat soweit ohne Probleme funktioniert. Verwundert musste ich jedoch feststellen, dass Celery die Funktion execute_after_task doppelt ausführt. Das führt wiederum dazu, dass Redis doppelt mit der Task ID bespeichert wird und das hat nicht nur den negativen Effekt von ineffektivem Ressourcenmanagement, sondern kann auch zu Problemen auf der Client-Seite führen, welche Redis nach einer ID prüft.

Debugging der Celery Signale - Wo ist der Fehler?

In meinem Aufbau hatte ich zwei Imports (import a, import b) in meiner test_celery.py die wiederum beide das Modul celery_api importiert hatten, in welcher der @signals.task_success.connect decorator die execute_after_task annotiert hatte. Eventuell hilft die folgende Darstellung:


#test_celery.py
import a
import b

#a.py
import celery_api

#b.py
import celery_api

Um das Ganze genauer zu verstehen, habe ich die Ausführung der execute_after_task mit dem Debugger betrachtet und konnte sehen, dass Celery die mit decorators versehenen Funktionen in eine interne Registry ablegt. Das trifft nicht nur auf task_success zu, sondern beispielsweise auch auf @signals.task_failure.connect.

In Celery können wir in der signal.py folgende Zeile beobachten:


#signal.py
for receiver in self._live_receivers(sender):

Der Code iteriert über alle receivers die registriert wurden. Beim Auflösen dieser Funktion sieht man, dass execute_after_task doppelt als weak_reference Objekte aufgelistet wird.

Für ein besseres Verständnis hilft es den eigentlichen decorator @signals.task_success.connect zu debuggen. Dadurch ist es möglich, beim durchlaufen der Funktion, den Call Stack zu betrachten. Dieser zeigt, dass der decorator während des Imports von den zwei verschiedenen Modulen innerhalb der test_celery.py aufgerufen wird. Nämlich von Modul a und b. Die Ursache für die doppelte Ausführung der Funktion ist also, dass der @signals.task_success.connect decorator zwei mal ausgeführt wird.

Wie verhindere ich den doppelten Aufruf?

Die Lösung ist recht einfach: Um das zu vermeiden, kann ein Parameter dispatch_uid='path.to.function' mitgegeben werden. Der neue decorator sieht wie folgt aus:


@signals.task_success.connect(dispatch_uid='src.celery_api.tasks')

Merke: In Python wird ein decorator während der Imports ausgeführt. Das ist generell so in Python und keine Besonderheit von Celery.

Dieser Umstand ist mir glücklicherweise während dem Testen aufgefallen, jedoch hätte es auch in jedem anderen Modul passieren können. Simultan sollte das auch für den task_failure decorator vorgenommen werden.

Hilfreiche Quellen

Zum Abschluss noch zwei Ressourcen, welche bei der Fehlersuche geholfen haben:

Signals – Django

Eine kurze Erklärung von Djangoproject

task_revoked Signal is triggered twice, when task is revoked · Issue #3805 · celery/celery
Checklist I have included the output of celery -A proj report in the issue. (if you are not able to do this, then at least specify the Celery version affected). software -> celery:4.0.2 (latentcall…

Ein GitHub issue über doppelt getriggerte Signale