"Si un trabajador quiere hacer bien su trabajo, primero debe afilar sus herramientas." - Confucio, "Las Analectas de Confucio. Lu Linggong"
Página delantera > Programación > Recarga automáticamente los trabajadores de Celery con un comando Django personalizado

Recarga automáticamente los trabajadores de Celery con un comando Django personalizado

Publicado el 2024-08-05
Navegar:553

Automatically reload Celery workers with a custom Django command

El apio anteriormente tenía un indicador --autoreload que desde entonces se eliminó. Sin embargo, Django tiene recarga automática integrada en su comando Manage.py RunServer. La ausencia de recarga automática en los trabajadores de Celery crea una experiencia de desarrollo confusa: la actualización del código Python hace que el servidor Django se recargue con el código actual, pero cualquier tarea que ejecute el servidor ejecutará código obsoleto en el trabajador de Celery.

Esta publicación le mostrará cómo crear un comando runworker administrado.py personalizado que recarga automáticamente los trabajadores de Celery durante el desarrollo. El comando se modelará a partir del servidor de ejecución y veremos cómo funciona la recarga automática de Django en su interior.

Antes de que comencemos

Esta publicación asume que tienes una aplicación Django con Celery ya instalada (guía). También supone que comprende las diferencias entre proyectos y aplicaciones en Django.

Todos los enlaces al código fuente y la documentación serán para las versiones actuales de Django y Celery en el momento de la publicación (julio de 2024). Si estás leyendo esto en un futuro lejano, es posible que las cosas hayan cambiado.

Finalmente, el directorio principal del proyecto se llamará my_project en los ejemplos de la publicación.

Solución: un comando personalizado

Crearemos un comando administrado.py personalizado llamado runworker. Debido a que Django proporciona recarga automática a través de su comando Runsever, usaremos el código fuente de Runserver como base de nuestro comando personalizado.

Puedes crear un comando en Django creando un directorio de administración/comandos/ dentro de cualquiera de las aplicaciones de tu proyecto. Una vez que se hayan creado los directorios, puede colocar un archivo Python con el nombre del comando que desea crear dentro de ese directorio (docs).

Suponiendo que su proyecto tenga una aplicación llamada polls, crearemos un archivo en polls/management/commands/runworker.py y agregaremos el siguiente código:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

IMPORTANTE: Asegúrate de reemplazar todas las instancias de my_project con el nombre de tu proyecto Django.

Si desea copiar y pegar este código y continuar con su programación, puede detenerse aquí de manera segura sin leer el resto de esta publicación. Esta es una solución elegante que le será de gran utilidad mientras desarrolla su proyecto Django & Celery. Sin embargo, si quieres saber más sobre cómo funciona, sigue leyendo.

Cómo funciona (opcional)

En lugar de revisar este código línea por línea, analizaré las partes más interesantes por tema. Si aún no estás familiarizado con los comandos personalizados de Django, quizás quieras revisar los documentos antes de continuar.

Recarga automática

Esta parte se siente más mágica. Dentro del cuerpo del método handle() del comando, hay una llamada al autoreload.run_with_reloader() interno de Django. Acepta una función de devolución de llamada que se ejecutará cada vez que se cambie un archivo Python en el proyecto. ¿Cómo funciona eso en realidad?

Echemos un vistazo a una versión simplificada del código fuente de la función autoreload.run_with_reloader(). La función simplificada reescribe, inserta y elimina código para brindar claridad sobre su funcionamiento.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

Cuando se ejecuta Manage.py runworker en la línea de comando, primero llamará al método handle() que llamará a run_with_reloader().

Dentro de run_with_reloader(), comprobará si una variable de entorno llamada RUN_MAIN tiene un valor "verdadero". Cuando se llama a la función por primera vez, RUN_MAIN no debería tener ningún valor.

Cuando RUN_MAIN no está configurado en "true", run_with_reloader() entrará en un bucle. Dentro del bucle, iniciará un subproceso que volverá a ejecutar el Manage.py [nombre_comando] que se pasó y luego esperará a que salga ese subproceso. Si el subproceso sale con el código de retorno 3, la siguiente iteración del bucle iniciará un nuevo subproceso y esperará. El bucle se ejecutará hasta que un subproceso devuelva un código de salida que no sea 3 (o hasta que el usuario salga con Ctrl c). Una vez que obtenga un código de retorno distinto de 3, saldrá del programa por completo.

El subproceso generado ejecuta el comando Manage.py nuevamente (en nuestro caso, Manage.py runworker), y nuevamente el comando llamará a run_with_reloader(). Esta vez, RUN_MAIN se establecerá en "verdadero" porque el comando se ejecuta en un subproceso.

Ahora que run_with_reloader() sabe que está en un subproceso, obtendrá un recargador que observa los cambios en el archivo, colocará la función de devolución de llamada proporcionada en un hilo y la pasará al recargador, que comienza a observar los cambios.

Cuando un recargador detecta un cambio de archivo, ejecuta sys.exit(3). Esto sale del subproceso, lo que desencadena la siguiente iteración del bucle a partir del código que generó el subproceso. A su vez, se lanza un nuevo subproceso que utiliza una versión actualizada del código.

Comprobaciones y migraciones del sistema

De forma predeterminada, los comandos de Django realizan comprobaciones del sistema antes de ejecutar su método handle(). Sin embargo, en el caso derunserver y nuestro comando runworker personalizado, querremos posponer su ejecución hasta que estemos dentro de la devolución de llamada que proporcionamos a run_with_reloader(). En nuestro caso, este es nuestro método run_worker(). Esto nos permite ejecutar el comando con recarga automática mientras reparamos las comprobaciones del sistema fallidas.

Para posponer la ejecución de las comprobaciones del sistema, el valor del atributo require_system_checks se establece en una lista vacía y las comprobaciones se realizan llamando a self.check() en el cuerpo de run_worker(). Al igual que openserver, nuestro comando runworker personalizado también verifica si se han ejecutado todas las migraciones y muestra una advertencia si hay migraciones pendientes.

Debido a que ya estamos realizando verificaciones del sistema de Django dentro del método run_worker(), deshabilitamos las verificaciones del sistema en Celery pasándole el indicador --skip-checks para evitar trabajo duplicado.

Todo el código relacionado con las comprobaciones y migraciones del sistema se eliminó directamente del código fuente del comando runserver.

apio_app.worker_main()

Nuestra implementación lanza el trabajador Celery directamente desde Python usando celery_app.worker_main() en lugar de desembolsar a Celery.

on_worker_init()

Este código se ejecuta cuando se inicializa el trabajador, mostrando la fecha y hora, la versión de Django y el comando para salir. Está modelado a partir de la información que se muestra cuando se inicia el servidor de ejecución.

Otro texto repetitivo del servidor de ejecución

Las siguientes líneas también se eliminaron de la fuente del servidor de ejecución:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_exception()

Nivel de registro

Nuestro comando personalizado tiene un nivel de registro configurable en caso de que el desarrollador quiera ajustar la configuración desde la CLI sin modificar el código.

Ir más lejos

Hurgué y empujé el código fuente de Django & Celery para crear esta implementación, y hay muchas oportunidades para ampliarla. Puede configurar el comando para aceptar más argumentos de trabajador de Celery. Alternativamente, puede crear un comando Manage.py personalizado que recargue automáticamente cualquier comando de shell como lo hizo David Browne en este Gist.

Si esto te resultó útil, no dudes en dejar un me gusta o un comentario. Gracias por leer.

Declaración de liberación Este artículo se reproduce en: https://dev.to/tylerlwsmith/automatically-reload-celery-workers-with-a-custom-django-command-1ojl?1 Si hay alguna infracción, comuníquese con [email protected] para borrarlo
Último tutorial Más>

Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.

Copyright© 2022 湘ICP备2022001581号-3