「労働者が自分の仕事をうまくやりたいなら、まず自分の道具を研ぎ澄まさなければなりません。」 - 孔子、「論語。陸霊公」
表紙 > プログラミング > カスタム Django コマンドを使用して Celery ワーカーを自動的にリロードする

カスタム Django コマンドを使用して Celery ワーカーを自動的にリロードする

2024 年 8 月 5 日に公開
ブラウズ:350

Automatically reload Celery workers with a custom Django command

Celery には以前は --autoreload フラグがありましたが、現在は削除されています。ただし、Django には、manage.py runserver コマンドに自動リロード機能が組み込まれています。 Celery ワーカーに自動リロードがないため、開発エクスペリエンスが混乱します。Python コードを更新すると、Django サーバーは現在のコードでリロードされますが、サーバーが起動するタスクはすべて、Celery ワーカーで古いコードを実行します。

この投稿では、開発中に Celery ワーカーを自動的にリロードするカスタム manage.py runworker コマンドを構築する方法を説明します。このコマンドは runserver をモデルにして、Django の自動リロードが内部でどのように機能するかを見ていきます。

始める前に

この投稿では、Celery がすでにインストールされている Django アプリがあることを前提としています (ガイド)。また、Django のプロジェクトとアプリケーションの違いを理解していることも前提としています。

ソース コードおよびドキュメントへのすべてのリンクは、発行時 (2024 年 7 月) の現在のバージョンの Django および Celery のものになります。あなたが遠い将来にこれを読んでいるなら、状況は変わっているかもしれません。

最後に、投稿の例では、メイン プロジェクト ディレクトリの名前は my_project になります。

解決策: カスタム コマンド

runworker という名前のカスタム manage.py コマンドを作成します。 Django は、runserver コマンドを介して自動リロードを提供するため、runserver のソース コードをカスタム コマンドの基礎として使用します。

プロジェクトのアプリケーション内に manage/commands/ ディレクトリを作成することで、Django でコマンドを作成できます。ディレクトリが作成されたら、そのディレクトリ内に作成したいコマンドの名前を含む Python ファイルを置くことができます (docs)。

プロジェクトにpollsという名前のアプリケーションがあると仮定して、polls/management/commands/runworker.pyにファイルを作成し、次のコードを追加します。

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

重要: my_project のすべてのインスタンスを必ず Django プロジェクトの名前に置き換えてください。

このコードをコピーして貼り付けてプログラミングを続行したい場合は、この投稿の残りを読まなくても、ここで安全に停止できます。これは、Django & Celery プロジェクトを開発する際に役立つ洗練されたソリューションです。ただし、その仕組みについてさらに詳しく知りたい場合は、読み続けてください。

仕組み (オプション)

このコードを 1 行ずつレビューするのではなく、トピックごとに最も興味深い部分について説明します。 Django カスタム コマンドにまだ慣れていない場合は、続行する前にドキュメントを確認してください。

自動リロード

この部分が最も魔法のように感じられます。コマンドの handle() メソッドの本体内に、Django の内部 autoreload.run_with_reloader() への呼び出しがあります。プロジェクト内で Python ファイルが変更されるたびに実行されるコールバック関数を受け入れます。 実際はどのように機能しますか?

autoreload.run_with_reloader() 関数のソース コードの簡略化されたバージョンを見てみましょう。簡略化された関数は、コードの書き換え、インライン化、削除を行い、その動作を明確にします。

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

manage.py runworker がコマンド ラインで実行されると、最初に handle() メソッドが呼び出され、このメソッドが run_with_reloader() を呼び出します。

run_with_reloader() 内で、RUN_MAIN という環境変数の値が「true」かどうかを確認します。関数が最初に呼び出されるとき、RUN_MAIN には値がありません。

RUN_MAIN が "true" に設定されていない場合、run_with_reloader() はループに入ります。ループ内で、渡された manage.py [command_name] を再実行するサブプロセスを開始し、そのサブプロセスが終了するのを待ちます。サブプロセスが戻りコード 3 で終了した場合、ループの次の反復で新しいサブプロセスが開始され、待機します。このループは、サブプロセスが 3 以外の終了コードを返すまで (またはユーザーが Ctrl C キーで終了するまで) 実行されます。 3 以外のリターン コードを取得すると、プログラムは完全に終了します。

生成されたサブプロセスは、manage.py コマンドを再度実行し (この場合は manage.py runworker)、コマンドは再度 run_with_reloader() を呼び出します。今回は、コマンドがサブプロセスで実行されているため、RUN_MAIN は「true」に設定されます。

run_with_reloader() はサブプロセス内にあることを認識したので、ファイルの変更を監視するリローダーを取得し、提供されたコールバック関数をスレッドに配置し、それを変更の監視を開始するリローダーに渡します。

リローダーはファイルの変更を検出すると、sys.exit(3) を実行します。これによりサブプロセスが終了し、サブプロセスを生成したコードからループの次の反復がトリガーされます。次に、更新されたバージョンのコードを使用する新しいサブプロセスが起動されます。

システムのチェックと移行

デフォルトでは、Django コマンドは handle() メソッドを実行する前にシステム チェックを実行します。ただし、runserver とカスタム runworker コマンドの場合は、run_with_reloader() に提供するコールバック内に入るまで、これらの実行を延期する必要があります。私たちの場合、これは run_worker() メソッドです。これにより、壊れたシステム チェックを修正しながら、自動リロードでコマンドを実行できるようになります。

システム チェックの実行を延期するには、requires_system_checks 属性の値を空のリストに設定し、run_worker() の本体で self.check() を呼び出してチェックを実行します。 runserver と同様に、カスタム runworker コマンドもすべての移行が実行されたかどうかを確認し、保留中の移行がある場合は警告を表示します。

すでに run_worker() メソッド内で Django のシステム チェックを実行しているため、作業の重複を防ぐために --skip-checks フラグを渡して Celery のシステム チェックを無効にします。

システム チェックと移行に関連するすべてのコードは、runserver コマンドのソース コードから直接引用されました。

celery_app.worker_main()

私たちの実装では、Celery にシェルアウトするのではなく、celery_app.worker_main() を使用して Python から直接 Celery ワーカーを起動します。

on_worker_init()

このコードはワーカーの初期化時に実行され、日付と時刻、Django のバージョン、終了するコマンドが表示されます。これは、runserver の起動時に表示される情報をモデルにしています。

その他のランサーバー定型文

次の行も runserver ソースから引用されました:

  • suppressed_base_arguments = {"--冗長性", "--traceback"}
  • autoreload.raise_last_Exception()

ログレベル

開発者がコードを変更せずに CLI から設定を調整したい場合に備えて、カスタム コマンドには構成可能なログ レベルがあります。

もっと遠く行く

私はこの実装を構築するために Django と Celery のソース コードを徹底的に調べましたが、拡張する機会はたくさんあります。 Celery のワーカー引数をさらに受け入れるようにコマンドを構成できます。あるいは、David Browne がこの Gist で行ったように、 任意の シェル コマンドを自動的にリロードするカスタム manage.py コマンドを作成することもできます。

これが役に立ったと思われた場合は、お気軽に「いいね!」またはコメントを残してください。読んでくれてありがとう。

リリースステートメント この記事は次の場所に転載されています: https://dev.to/tylerlwsmith/automatically-reload-celery-workers-with-a-custom-django-command-1ojl?1 侵害がある場合は、[email protected] までご連絡ください。それを削除するには
最新のチュートリアル もっと>

免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。

Copyright© 2022 湘ICP备2022001581号-3