Celery tinha anteriormente um sinalizador --autoreload que já foi removido. No entanto, o Django possui recarregamento automático embutido em seu comando manager.py runserver. A ausência de recarregamento automático nos trabalhadores do Celery cria uma experiência de desenvolvimento confusa: a atualização do código Python faz com que o servidor Django seja recarregado com o código atual, mas qualquer tarefa que o servidor dispara executará código obsoleto no trabalhador do Celery.
Esta postagem mostrará como construir um comando manager.py runworker personalizado que recarrega automaticamente os trabalhadores do Celery durante o desenvolvimento. O comando será modelado após runserver, e daremos uma olhada em como o recarregamento automático do Django funciona nos bastidores.
Este post pressupõe que você tenha um aplicativo Django com Celery já instalado (guia). Também pressupõe que você entenda as diferenças entre projetos e aplicativos no Django.
Todos os links para o código-fonte e documentação serão para as versões atuais do Django e Celery no momento da publicação (julho de 2024). Se você estiver lendo isso em um futuro distante, as coisas podem ter mudado.
Finalmente, o diretório principal do projeto será nomeado my_project nos exemplos do post.
Criaremos um comando manager.py personalizado chamado runworker. Como o Django fornece recarregamento automático através de seu comando runsever, usaremos o código-fonte do runserver como base de nosso comando personalizado.
Você pode criar um comando no Django criando um diretório management/commands/ dentro de qualquer uma das aplicações do seu projeto. Depois que os diretórios forem criados, você poderá colocar um arquivo Python com o nome do comando que deseja criar nesse diretório (docs).
Supondo que seu projeto tenha um aplicativo chamado polls, criaremos um arquivo em polls/management/commands/runworker.py e adicionaremos o seguinte código:
# polls/management/commands/runworker.py import sys from datetime import datetime from celery.signals import worker_init from django.conf import settings from django.core.management.base import BaseCommand from django.utils import autoreload from my_project.celery import app as celery_app class Command(BaseCommand): help = "Starts a Celery worker instance with auto-reloading for development." # Validation is called explicitly each time the worker instance is reloaded. requires_system_checks = [] suppressed_base_arguments = {"--verbosity", "--traceback"} def add_arguments(self, parser): parser.add_argument( "--skip-checks", action="store_true", help="Skip system checks.", ) parser.add_argument( "--loglevel", choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"), type=str.upper, # Transforms user input to uppercase. default="INFO", ) def handle(self, *args, **options): autoreload.run_with_reloader(self.run_worker, **options) def run_worker(self, **options): # If an exception was silenced in ManagementUtility.execute in order # to be raised in the child process, raise it now. autoreload.raise_last_exception() if not options["skip_checks"]: self.stdout.write("Performing system checks...\n\n") self.check(display_num_errors=True) # Need to check migrations here, so can't use the # requires_migrations_check attribute. self.check_migrations() # Print Django info to console when the worker initializes. worker_init.connect(self.on_worker_init) # Start the Celery worker. celery_app.worker_main( [ "--app", "my_project", "--skip-checks", "worker", "--loglevel", options["loglevel"], ] ) def on_worker_init(self, sender, **kwargs): quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C" now = datetime.now().strftime("%B %d, %Y - %X") version = self.get_version() print( f"{now}\n" f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n" f"Quit the worker instance with {quit_command}.", file=self.stdout, )
IMPORTANTE: Certifique-se de substituir todas as instâncias de my_project pelo nome do seu projeto Django.
Se você quiser copiar e colar este código e continuar com sua programação, você pode parar aqui com segurança sem ler o restante deste post. Esta é uma solução elegante que irá atendê-lo bem no desenvolvimento de seu projeto Django & Celery. No entanto, se você quiser saber mais sobre como funciona, continue lendo.
Em vez de revisar esse código linha por linha, discutirei as partes mais interessantes por tópico. Se você ainda não está familiarizado com os comandos personalizados do Django, você pode querer revisar a documentação antes de continuar.
Esta parte parece mais mágica. Dentro do corpo do método handle() do comando, há uma chamada para o autoreload.run_with_reloader() interno do Django. Ele aceita uma função de retorno de chamada que será executada sempre que um arquivo Python for alterado no projeto. Como isso realmente funciona?
Vamos dar uma olhada em uma versão simplificada do código-fonte da função autoreload.run_with_reloader(). A função simplificada reescreve, insere e exclui código para fornecer clareza sobre sua operação.
# NOTE: This has been dramatically pared down for clarity. def run_with_reloader(callback_func, *args, **kwargs): # NOTE: This will evaluate to False the first time it is run. is_inside_subprocess = os.getenv("RUN_MAIN") == "true" if is_inside_subprocess: # The reloader watches for Python file changes. reloader = get_reloader() django_main_thread = threading.Thread( target=callback_func, args=args, kwargs=kwargs ) django_main_thread.daemon = True django_main_thread.start() # When the code changes, the reloader exits with return code 3. reloader.run(django_main_thread) else: # Returns Python path and the arguments passed to the command. # Example output: ['/path/to/python', './manage.py', 'runworker'] args = get_child_arguments() subprocess_env = {**os.environ, "RUN_MAIN": "true"} while True: # Rerun the manage.py command in a subprocess. p = subprocess.run(args, env=subprocess_env, close_fds=False) if p.returncode != 3: sys.exit(p.returncode)
Quando o manager.py runworker é executado na linha de comando, ele primeiro chamará o método handle() que chamará run_with_reloader().
Dentro de run_with_reloader(), ele verificará se uma variável de ambiente chamada RUN_MAIN tem um valor "true". Quando a função é chamada pela primeira vez, RUN_MAIN não deve ter valor.
Quando RUN_MAIN não estiver definido como "true", run_with_reloader() entrará em um loop. Dentro do loop, ele iniciará um subprocesso que executará novamente o manager.py [command_name] que foi passado e aguardará a saída desse subprocesso. Se o subprocesso terminar com o código de retorno 3, a próxima iteração do loop iniciará um novo subprocesso e aguardará. O loop será executado até que um subprocesso retorne um código de saída diferente de 3 (ou até que o usuário saia com ctrl c). Assim que obtiver um código de retorno diferente de 3, ele sairá completamente do programa.
O subprocesso gerado executa o comando manager.py novamente (em nosso caso, manager.py runworker), e novamente o comando chamará run_with_reloader(). Desta vez, RUN_MAIN será definido como "true" porque o comando está sendo executado em um subprocesso.
Agora que run_with_reloader() sabe que está em um subprocesso, ele obterá um recarregador que observa as alterações do arquivo, colocará a função de retorno de chamada fornecida em um thread e a passará para o recarregador que começará a observar as alterações.
Quando um recarregador detecta uma alteração no arquivo, ele executa sys.exit(3). Isso encerra o subprocesso, o que aciona a próxima iteração do loop do código que gerou o subprocesso. Por sua vez, é lançado um novo subprocesso que utiliza uma versão atualizada do código.
Por padrão, os comandos do Django realizam verificações do sistema antes de executarem seu método handle(). No entanto, no caso de runserver e nosso comando runworker personalizado, desejaremos adiar a execução deles até que estejamos dentro do retorno de chamada que fornecemos para run_with_reloader(). No nosso caso, este é o nosso método run_worker(). Isso nos permite executar o comando com recarga automática enquanto conserta verificações de sistema quebradas.
Para adiar a execução das verificações do sistema, o valor do atributo require_system_checks é definido como uma lista vazia e as verificações são realizadas chamando self.check() no corpo de run_worker(). Assim como o runserver, nosso comando runworker personalizado também verifica se todas as migrações foram executadas e exibe um aviso se houver migrações pendentes.
Como já estamos realizando verificações de sistema do Django dentro do método run_worker(), desabilitamos as verificações de sistema no Celery passando-lhe o sinalizador --skip-checks para evitar trabalho duplicado.
Todo o código relacionado às verificações e migrações do sistema foi retirado diretamente do código-fonte do comando runserver.
Nossa implementação lança o trabalhador Celery diretamente do Python usando celery_app.worker_main() em vez de pagar pelo Celery.
Este código é executado quando o trabalhador é inicializado, exibindo a data e hora, a versão do Django e o comando para sair. Ele é modelado com base nas informações exibidas quando o runserver é inicializado.
As seguintes linhas também foram retiradas da fonte do runserver:
Nosso comando personalizado tem um nível de log configurável caso o desenvolvedor queira ajustar a configuração da CLI sem modificar o código.
Eu vasculhei o código-fonte do Django & Celery para construir esta implementação e há muitas oportunidades para estendê-la. Você pode configurar o comando para aceitar mais argumentos de trabalho do Celery. Alternativamente, você pode criar um comando manager.py personalizado que recarrega automaticamente qualquer comando shell como David Browne fez neste Gist.
Se você achou isso útil, fique à vontade para deixar um like ou um comentário. Obrigado pela leitura.
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3