"Si un ouvrier veut bien faire son travail, il doit d'abord affûter ses outils." - Confucius, "Les Entretiens de Confucius. Lu Linggong"
Page de garde > La programmation > Rechargez automatiquement les travailleurs Celery avec une commande Django personnalisée

Rechargez automatiquement les travailleurs Celery avec une commande Django personnalisée

Publié le 2024-08-05
Parcourir:827

Automatically reload Celery workers with a custom Django command

Le céleri avait auparavant un indicateur --autoreload qui a depuis été supprimé. Cependant, Django intègre le rechargement automatique dans sa commande manage.py runserver. L'absence de rechargement automatique dans les Workers Celery crée une expérience de développement déroutante : la mise à jour du code Python entraîne le rechargement du serveur Django avec le code actuel, mais toutes les tâches déclenchées par le serveur exécuteront du code obsolète dans le Worker Celery.

Cet article vous montrera comment créer une commande manage.py runworker personnalisée qui recharge automatiquement les travailleurs Celery pendant le développement. La commande sera calquée sur runserver, et nous verrons comment le rechargement automatique de Django fonctionne sous le capot.

Avant que nous commencions

Cet article suppose que vous disposez d'une application Django avec Celery déjà installée (guide). Cela suppose également que vous compreniez les différences entre les projets et les applications dans Django.

Tous les liens vers le code source et la documentation concerneront les versions actuelles de Django et Celery au moment de la publication (juillet 2024). Si vous lisez ceci dans un futur lointain, les choses ont peut-être changé.

Enfin, le répertoire principal du projet sera nommé my_project dans les exemples de l'article.

Solution : une commande personnalisée

Nous allons créer une commande manage.py personnalisée appelée runworker. Étant donné que Django propose un rechargement automatique via sa commande runsever, nous utiliserons le code source de runserver comme base de notre commande personnalisée.

Vous pouvez créer une commande dans Django en créant un répertoire management/commands/ dans n'importe laquelle des applications de votre projet. Une fois les répertoires créés, vous pouvez ensuite placer un fichier Python portant le nom de la commande que vous souhaitez créer dans ce répertoire (docs).

En supposant que votre projet ait une application nommée polls, nous allons créer un fichier dans polls/management/commands/runworker.py et ajouter le code suivant :

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

IMPORTANT : Assurez-vous de remplacer toutes les instances de my_project par le nom de votre projet Django.

Si vous souhaitez copier et coller ce code et continuer votre programmation, vous pouvez vous arrêter ici en toute sécurité sans lire le reste de cet article. C'est une solution élégante qui vous sera très utile lors du développement de votre projet Django & Celery. Cependant, si vous souhaitez en savoir plus sur son fonctionnement, continuez à lire.

Comment ça marche (facultatif)

Plutôt que de revoir ce code ligne par ligne, je discuterai des parties les plus intéressantes par sujet. Si vous n'êtes pas déjà familier avec les commandes personnalisées de Django, vous souhaiterez peut-être consulter la documentation avant de continuer.

Rechargement automatique

Cette partie semble la plus magique. Dans le corps de la méthode handle() de la commande, il y a un appel à la fonction autoreload.run_with_reloader() interne de Django. Il accepte une fonction de rappel qui s'exécutera chaque fois qu'un fichier Python est modifié dans le projet. Comment cela fonctionne réellement ?

Jetons un coup d'œil à une version simplifiée du code source de la fonction autoreload.run_with_reloader(). La fonction simplifiée réécrit, intègre et supprime le code pour clarifier son fonctionnement.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

Lorsque manage.py runworker est exécuté dans la ligne de commande, il appellera d'abord la méthode handle() qui appellera run_with_reloader().

Dans run_with_reloader(), il vérifiera si une variable d'environnement appelée RUN_MAIN a la valeur "true". Lorsque la fonction est appelée pour la première fois, RUN_MAIN ne doit avoir aucune valeur.

Lorsque RUN_MAIN n'est pas défini sur "true", run_with_reloader() entrera dans une boucle. À l'intérieur de la boucle, il démarrera un sous-processus qui réexécutera le manage.py [nom_commande] qui a été transmis, puis attendra la fin de ce sous-processus. Si le sous-processus se termine avec le code retour 3, la prochaine itération de la boucle démarrera un nouveau sous-processus et attendra. La boucle s'exécutera jusqu'à ce qu'un sous-processus renvoie un code de sortie différent de 3 (ou jusqu'à ce que l'utilisateur quitte avec ctrl c). Une fois qu'il obtient un code retour autre que 3, il quittera complètement le programme.

Le sous-processus généré exécute à nouveau la commande manage.py (dans notre cas manage.py runworker), et encore une fois la commande appellera run_with_reloader(). Cette fois, RUN_MAIN sera défini sur "true" car la commande s'exécute dans un sous-processus.

Maintenant que run_with_reloader() sait qu'il se trouve dans un sous-processus, il obtiendra un rechargeur qui surveille les modifications de fichiers, placera la fonction de rappel fournie dans un thread et la transmettra au rechargeur qui commencera à surveiller les modifications.

Lorsqu'un rechargeur détecte une modification de fichier, il exécute sys.exit(3). Cela quitte le sous-processus, ce qui déclenche la prochaine itération de la boucle à partir du code qui a généré le sous-processus. À son tour, un nouveau sous-processus est lancé qui utilise une version mise à jour du code.

Vérifications et migrations du système

Par défaut, les commandes Django effectuent des vérifications du système avant d'exécuter leur méthode handle(). Cependant, dans le cas de runserver et de notre commande runworker personnalisée, nous souhaiterons reporter leur exécution jusqu'à ce que nous soyons dans le rappel que nous fournissons à run_with_reloader(). Dans notre cas, il s'agit de notre méthode run_worker(). Cela nous permet d'exécuter la commande avec un rechargement automatique tout en corrigeant les vérifications système interrompues.

Pour reporter l'exécution des vérifications du système, la valeur de l'attribut require_system_checks est définie sur une liste vide et les vérifications sont effectuées en appelant self.check() dans le corps de run_worker(). Comme runserver, notre commande runworker personnalisée vérifie également si toutes les migrations ont été exécutées et affiche un avertissement s'il y a des migrations en attente.

Comme nous effectuons déjà les vérifications du système de Django dans la méthode run_worker(), nous désactivons les vérifications du système dans Celery en lui passant l'indicateur --skip-checks pour éviter les travaux en double.

Tout le code relatif aux vérifications et aux migrations du système a été extrait directement du code source de la commande runserver.

céleri_app.worker_main()

Notre implémentation lance le travailleur Celery directement depuis Python en utilisant celery_app.worker_main() plutôt que de le lancer sur Celery.

on_worker_init()

Ce code s'exécute lorsque le travailleur est initialisé, affichant la date et l'heure, la version de Django et la commande pour quitter. Il est calqué sur les informations qui s'affichent au démarrage de runserver.

Autre passe-partout du serveur d'exécution

Les lignes suivantes ont également été extraites de la source runserver :

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_exception()

Niveau de journalisation

Notre commande personnalisée a un niveau de journalisation configurable au cas où le développeur souhaite ajuster le paramètre à partir de la CLI sans modifier le code.

Aller plus loin

J'ai fouillé et poussé le code source de Django & Celery pour construire cette implémentation, et il existe de nombreuses opportunités pour l'étendre. Vous pouvez configurer la commande pour accepter davantage d'arguments de travail de Celery. Alternativement, vous pouvez créer une commande manage.py personnalisée qui recharge automatiquement n'importe quelle commande shell comme David Browne l'a fait dans cet Gist.

Si vous avez trouvé cela utile, n'hésitez pas à laisser un like ou un commentaire. Merci d'avoir lu.

Déclaration de sortie Cet article est reproduit sur : https://dev.to/tylerlwsmith/automatically-reload-celery-workers-with-a-custom-django-command-1ojl?1 En cas de violation, veuillez contacter [email protected] pour le supprimer
Dernier tutoriel Plus>

Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.

Copyright© 2022 湘ICP备2022001581号-3