„Wenn ein Arbeiter seine Arbeit gut machen will, muss er zuerst seine Werkzeuge schärfen.“ – Konfuzius, „Die Gespräche des Konfuzius. Lu Linggong“
Titelseite > Programmierung > Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu

Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu

Veröffentlicht am 05.08.2024
Durchsuche:475

Automatically reload Celery workers with a custom Django command

Celery hatte zuvor ein --autoreload-Flag, das inzwischen entfernt wurde. Django verfügt jedoch über ein integriertes automatisches Neuladen in seinen runserver-Befehl manage.py. Das Fehlen eines automatischen Neuladens in Celery-Workern führt zu einer verwirrenden Entwicklungserfahrung: Das Aktualisieren von Python-Code führt dazu, dass der Django-Server mit dem aktuellen Code neu geladen wird, aber alle Aufgaben, die der Server auslöst, führen veralteten Code im Celery-Worker aus.

In diesem Beitrag erfahren Sie, wie Sie einen benutzerdefinierten manage.py-Runworker-Befehl erstellen, der Celery-Worker während der Entwicklung automatisch neu lädt. Der Befehl wird dem Runserver nachempfunden sein und wir werden einen Blick darauf werfen, wie Djangos automatisches Neuladen unter der Haube funktioniert.

Bevor wir anfangen

In diesem Beitrag wird davon ausgegangen, dass Sie eine Django-App haben, auf der Celery bereits installiert ist (Anleitung). Es setzt außerdem voraus, dass Sie die Unterschiede zwischen Projekten und Anwendungen in Django verstehen.

Alle Links zum Quellcode und zur Dokumentation beziehen sich auf die aktuellen Versionen von Django und Celery zum Zeitpunkt der Veröffentlichung (Juli 2024). Wenn Sie dies in ferner Zukunft lesen, haben sich die Dinge möglicherweise geändert.

Schließlich wird das Hauptprojektverzeichnis in den Beispielen des Beitrags my_project genannt.

Lösung: ein benutzerdefinierter Befehl

Wir werden einen benutzerdefinierten manage.py-Befehl namens runworker erstellen. Da Django über seinen Runsever-Befehl ein automatisches Neuladen ermöglicht, verwenden wir den Quellcode von Runserver als Grundlage für unseren benutzerdefinierten Befehl.

Sie können einen Befehl in Django erstellen, indem Sie in einer beliebigen Anwendung Ihres Projekts ein Verzeichnis „management/commands/“ erstellen. Sobald die Verzeichnisse erstellt wurden, können Sie eine Python-Datei mit dem Namen des Befehls, den Sie erstellen möchten, in diesem Verzeichnis ablegen (docs).

Angenommen, Ihr Projekt verfügt über eine Anwendung namens polls, erstellen wir eine Datei unter polls/management/commands/runworker.py und fügen den folgenden Code hinzu:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

WICHTIG: Stellen Sie sicher, dass Sie alle Instanzen von my_project durch den Namen Ihres Django-Projekts ersetzen.

Wenn Sie diesen Code kopieren und einfügen und mit Ihrer Programmierung fortfahren möchten, können Sie hier getrost aufhören, ohne den Rest dieses Beitrags zu lesen. Dies ist eine elegante Lösung, die Ihnen bei der Entwicklung Ihres Django & Celery-Projekts gute Dienste leisten wird. Wenn Sie jedoch mehr über die Funktionsweise erfahren möchten, lesen Sie weiter.

Wie es funktioniert (optional)

Anstatt diesen Code Zeile für Zeile durchzugehen, werde ich die interessantesten Teile thematisch besprechen. Wenn Sie mit den benutzerdefinierten Django-Befehlen noch nicht vertraut sind, sollten Sie die Dokumentation lesen, bevor Sie fortfahren.

Automatisches Nachladen

Dieser Teil fühlt sich am magischsten an. Im Hauptteil der handle()-Methode des Befehls gibt es einen Aufruf von Djangos internem autoreload.run_with_reloader(). Es akzeptiert eine Rückruffunktion, die jedes Mal ausgeführt wird, wenn eine Python-Datei im Projekt geändert wird. Wie funktioniert das eigentlich?

Sehen wir uns eine vereinfachte Version des Quellcodes der Funktion autoreload.run_with_reloader() an. Die vereinfachte Funktion schreibt Code neu, fügt ihn ein und löscht ihn, um Klarheit über seine Funktionsweise zu schaffen.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

Wenn manage.py runworker in der Befehlszeile ausgeführt wird, ruft es zuerst die Methode handle() auf, die run_with_reloader() aufruft.

In run_with_reloader() wird geprüft, ob eine Umgebungsvariable namens RUN_MAIN den Wert „true“ hat. Beim ersten Aufruf der Funktion sollte RUN_MAIN keinen Wert haben.

Wenn RUN_MAIN nicht auf „true“ gesetzt ist, tritt run_with_reloader() in eine Schleife ein. Innerhalb der Schleife wird ein Unterprozess gestartet, der die übergebene Datei manage.py [Befehlsname] erneut ausführt, und dann darauf warten, dass dieser Unterprozess beendet wird. Wenn der Unterprozess mit Rückkehrcode 3 beendet wird, startet die nächste Iteration der Schleife einen neuen Unterprozess und wartet. Die Schleife wird ausgeführt, bis ein Unterprozess einen Exit-Code zurückgibt, der nicht 3 ist (oder bis der Benutzer mit Strg C beendet wird). Sobald es einen Rückkehrcode ungleich 3 erhält, wird das Programm vollständig beendet.

Der erzeugte Unterprozess führt den Befehl manage.py erneut aus (in unserem Fall manage.py runworker), und der Befehl ruft erneut run_with_reloader() auf. Dieses Mal wird RUN_MAIN auf „true“ gesetzt, da der Befehl in einem Unterprozess ausgeführt wird.

Da run_with_reloader() nun weiß, dass es sich in einem Unterprozess befindet, erhält es einen Reloader, der auf Dateiänderungen überwacht, die bereitgestellte Rückruffunktion in einen Thread einfügt und sie an den Reloader übergibt, der mit der Suche nach Änderungen beginnt.

Wenn ein Reloader eine Dateiänderung erkennt, führt er sys.exit(3) aus. Dadurch wird der Unterprozess beendet, was die nächste Iteration der Schleife aus dem Code auslöst, der den Unterprozess erzeugt hat. Im Gegenzug wird ein neuer Unterprozess gestartet, der eine aktualisierte Version des Codes verwendet.

Systemprüfungen und Migrationen

Standardmäßig führen Django-Befehle Systemprüfungen durch, bevor sie ihre handle()-Methode ausführen. Im Fall von runserver und unserem benutzerdefinierten runworker-Befehl möchten wir jedoch deren Ausführung verschieben, bis wir uns innerhalb des Rückrufs befinden, den wir run_with_reloader() bereitstellen. In unserem Fall ist dies unsere run_worker()-Methode. Dadurch können wir den Befehl mit automatischem Neuladen ausführen und gleichzeitig fehlerhafte Systemprüfungen beheben.

Um die Ausführung der Systemprüfungen zu verschieben, wird der Wert des Attributs require_system_checks auf eine leere Liste gesetzt und die Prüfungen werden durch Aufrufen von self.check() im Hauptteil von run_worker() durchgeführt. Wie runserver prüft auch unser benutzerdefinierter runworker-Befehl, ob alle Migrationen ausgeführt wurden, und zeigt eine Warnung an, wenn Migrationen ausstehen.

Da wir die Systemprüfungen von Django bereits innerhalb der run_worker()-Methode durchführen, deaktivieren wir die Systemprüfungen in Celery, indem wir ihr das Flag --skip-checks übergeben, um doppelte Arbeit zu verhindern.

Der gesamte Code, der sich auf Systemprüfungen und Migrationen bezieht, wurde direkt aus dem Quellcode des Runserver-Befehls übernommen.

celery_app.worker_main()

Unsere Implementierung startet den Celery-Worker direkt aus Python mithilfe von celery_app.worker_main(), anstatt ihn an Celery zu senden.

on_worker_init()

Dieser Code wird ausgeführt, wenn der Worker initialisiert wird, und zeigt Datum und Uhrzeit, die Django-Version und den Befehl zum Beenden an. Es ist den Informationen nachempfunden, die beim Booten des Runservers angezeigt werden.

Andere Runserver-Boilerplate

Die folgenden Zeilen wurden ebenfalls aus der Runserver-Quelle entfernt:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_Exception()

Protokollebene

Unser benutzerdefinierter Befehl verfügt über eine konfigurierbare Protokollebene für den Fall, dass der Entwickler die Einstellung über die CLI anpassen möchte, ohne den Code zu ändern.

Weitergehen

Ich habe im Quellcode von Django & Celery herumgestöbert, um diese Implementierung zu erstellen, und es gibt viele Möglichkeiten, sie zu erweitern. Sie können den Befehl so konfigurieren, dass er mehr Worker-Argumente von Celery akzeptiert. Alternativ könnten Sie einen benutzerdefinierten manage.py-Befehl erstellen, der jeden Shell-Befehl automatisch neu lädt, wie David Browne es in diesem Gist getan hat.

Wenn Sie dies nützlich fanden, hinterlassen Sie gerne ein „Gefällt mir“ oder einen Kommentar. Danke fürs Lesen.

Freigabeerklärung Dieser Artikel ist abgedruckt unter: https://dev.to/tylerlwsmith/automatically-reload-celery-workers-with-a-custom-django-command-1ojl?1 Bei Verstößen wenden Sie sich bitte an [email protected] um es zu löschen
Neuestes Tutorial Mehr>

Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.

Copyright© 2022 湘ICP备2022001581号-3