«Если рабочий хочет хорошо выполнять свою работу, он должен сначала заточить свои инструменты» — Конфуций, «Аналитики Конфуция. Лу Лингун»
титульная страница > программирование > Автоматически перезагружать рабочие Celery с помощью специальной команды Django.

Автоматически перезагружать рабочие Celery с помощью специальной команды Django.

Опубликовано 5 августа 2024 г.
Просматривать:968

Automatically reload Celery workers with a custom Django command

Ранее у Celery был флаг --autoreload, который с тех пор был удален. Однако в Django встроена автоматическая перезагрузка в команду runserver Manage.py. Отсутствие автоматической перезагрузки в работниках Celery создает запутанную среду разработки: обновление кода Python приводит к перезагрузке сервера Django с текущим кодом, но любые задачи, которые запускает сервер, будут запускать устаревший код в работнике Celery.

В этом посте показано, как создать собственную команду runworker Manage.py, которая автоматически перезагружает рабочие процессы Celery во время разработки. Команда будет смоделирована по образцу сервера запуска, и мы посмотрим, как работает автоматическая перезагрузка Django.

Прежде чем мы начнем

В этом посте предполагается, что у вас уже установлено приложение Django с Celery (руководство). Также предполагается, что вы понимаете различия между проектами и приложениями в Django.

Все ссылки на исходный код и документацию относятся к текущим версиям Django и Celery на момент публикации (июль 2024 г.). Если вы читаете это в далеком будущем, возможно, все изменится.

Наконец, в примерах публикации основной каталог проекта будет называться my_project.

Решение: специальная команда

Мы создадим специальную команду Manage.py под названием runworker. Поскольку Django обеспечивает автоматическую перезагрузку с помощью команды runever, мы будем использовать исходный код сервера запуска в качестве основы нашей специальной команды.

Вы можете создать команду в Django, создав каталог управления/команды/ в любом из приложений вашего проекта. После создания каталогов вы можете поместить в этот каталог файл Python с именем команды, которую вы хотите создать (документация).

Предполагая, что в вашем проекте есть приложение с именем polls, мы создадим файл polls/management/commands/runworker.py и добавим следующий код:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

ВАЖНО: Обязательно замените все экземпляры my_project именем вашего проекта Django.

Если вы хотите скопировать и вставить этот код и продолжить программирование, вы можете спокойно остановиться здесь, не читая остальную часть этого поста. Это элегантное решение, которое пригодится вам при разработке проекта Django & Celery. Однако, если вы хотите узнать больше о том, как это работает, продолжайте читать.

Как это работает (необязательно)

Вместо того чтобы рассматривать этот код построчно, я рассмотрю самые интересные части по темам. Если вы еще не знакомы с пользовательскими командами Django, возможно, вам захочется просмотреть документацию, прежде чем продолжить.

Автоматическая перезарядка

Эта часть кажется самой волшебной. В теле метода handle() команды есть вызов внутренней функции autoreload.run_with_reloader() Django. Он принимает функцию обратного вызова, которая будет выполняться каждый раз, когда файл Python изменяется в проекте. Как это на самом деле работает?

Давайте посмотрим на упрощенную версию исходного кода функции autoreload.run_with_reloader(). Упрощенная функция переписывает, встраивает и удаляет код, чтобы обеспечить ясность его работы.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

Когда runworker Manage.py запускается в командной строке, он сначала вызывает метод handle(), который вызывает run_with_reloader().

Внутри run_with_reloader() он проверит, имеет ли переменная среды RUN_MAIN значение «true». При первом вызове функции RUN_MAIN не должно иметь значения.

Если для RUN_MAIN не установлено значение «true», run_with_reloader() войдет в цикл. Внутри цикла он запустит подпроцесс, который повторно запустит переданный файл Manage.py [имя_команды], а затем дождется завершения этого подпроцесса. Если подпроцесс завершается с кодом возврата 3, следующая итерация цикла запустит новый подпроцесс и будет ждать. Цикл будет выполняться до тех пор, пока подпроцесс не вернет код выхода, отличный от 3 (или до тех пор, пока пользователь не выйдет из системы, нажав ctrl c). Как только он получит код возврата, отличный от 3, он полностью выйдет из программы.

Порожденный подпроцесс снова запускает команду Manage.py (в нашем случае runworker Manage.py), и снова команда вызывает run_with_reloader(). На этот раз для RUN_MAIN будет установлено значение «истина», поскольку команда выполняется в подпроцессе.

Теперь, когда run_with_reloader() знает, что он находится в подпроцессе, он получит перезагрузщик, который отслеживает изменения файла, поместит предоставленную функцию обратного вызова в поток и передаст ее перезагрузщику, который начнет отслеживать изменения.

Когда программа перезагрузки обнаруживает изменение файла, она запускает sys.exit(3). При этом подпроцесс завершается, что запускает следующую итерацию цикла из кода, породившего подпроцесс. В свою очередь, запускается новый подпроцесс, использующий обновленную версию кода.

Системные проверки и миграции

По умолчанию команды Django выполняют проверку системы перед запуском метода handle(). Однако в случае с runserver и нашей специальной командой runworker нам нужно отложить их запуск до тех пор, пока мы не попадем в обратный вызов, который мы предоставляем run_with_reloader(). В нашем случае это метод run_worker(). Это позволяет нам запускать команду с автоматической перезагрузкой и исправлять нарушенные проверки системы.

Чтобы отложить выполнение системных проверок, значение атрибута require_system_checks устанавливается в пустой список, а проверки выполняются путем вызова self.check() в теле run_worker(). Как и runserver, наша пользовательская команда runworker также проверяет, были ли выполнены все миграции, и отображает предупреждение, если есть ожидающие миграции.

Поскольку мы уже выполняем системные проверки Django в методе run_worker(), мы отключаем системные проверки в Celery, передавая ему флаг --skip-checks, чтобы предотвратить дублирование работы.

Весь код, относящийся к системным проверкам и миграции, был взят непосредственно из исходного кода команды сервера запуска.

celery_app.worker_main()

Наша реализация запускает рабочий процесс Celery непосредственно из Python с помощью celery_app.worker_main(), а не через Celery.

on_worker_init()

Этот код выполняется при инициализации рабочего процесса и отображает дату и время, версию Django и команду выхода. Он смоделирован на основе информации, которая отображается при загрузке сервера запуска.

Другой шаблон сервера запуска

Следующие строки также были взяты из исходного кода сервера выполнения:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_Exception()

Уровень журнала

Наша пользовательская команда имеет настраиваемый уровень журнала на случай, если разработчик захочет изменить настройку из CLI без изменения кода.

Идти дальше

Я рылся в исходном коде Django & Celery, чтобы создать эту реализацию, и есть много возможностей для ее расширения. Вы можете настроить команду так, чтобы она принимала больше рабочих аргументов Celery. В качестве альтернативы вы можете создать собственную команду Manage.py, которая автоматически перезагружает любую команду оболочки, как это сделал Дэвид Браун в этом Gist.

Если вы нашли это полезным, не стесняйтесь оставить лайк или комментарий. Спасибо за прочтение.

Заявление о выпуске Эта статья воспроизведена по адресу: https://dev.to/tylerlwsmith/automatically-reload-celery-workers-with-a-custom-django-command-1ojl?1. В случае нарушения прав обращайтесь по адресу [email protected]. удалить его
Последний учебник Более>

Изучайте китайский

Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.

Copyright© 2022 湘ICP备2022001581号-3