セロリを学ぶのは気が遠くなるかもしれません。ドキュメントは包括的ですが、基本的な部分を省略する傾向があります。
この投稿では、Celery の 4 つの主要な概念を定義し、Celery と Kombu の関係について説明し、いくつかのコード例を使用して Celery が実際のアプリケーションでどのように役立つかを説明します。この例では Django Web フレームワークとその @shared_task デコレーターを使用しますが、この概念は Flask、FastAPI などにも適用できます。
現在の Celery ドキュメントで、ブローカー または バックエンド とみなされるものを明確に説明している場所を見つけるのは難しいでしょう。しかし、十分に掘れば、定義を推測します。
以下は、Celery を始める前に知っておくべき概念です。
タスクは、Celery が 非同期で 実行する作業です (この文脈では、「すぐにではない」という意味の派手な言葉です)。 Web アプリケーションでは、ユーザーがフォームを送信した後に電子メールを送信するタスクが考えられます。電子メールの送信には数秒かかる操作が必要になる場合があり、リダイレクトする前に電子メールの送信をユーザーに待たせると、アプリケーションが遅く感じる可能性があります。
タスクは Celery のデコレータを使用して定義されます。以下では、@shared_task デコレータを使用して、send_thank_you_email() を submit_feedback() フォーム送信ハンドラーで使用できる Celery タスクに変換します。
from config.celery import shared_task from django.core.mail import send_mail from django.shortcuts import render, redirect from feedback.forms import FeedbackForm @shared_task def send_thank_you_email(email_address): send_mail( "Thank you for your feedback!", "We appreciate your input.", "[email protected]", [email_address], ) def submit_feedback(request): if request.method == "POST": form = FeedbackForm(request.POST) if form.is_valid(): form.save() # Push the task to the broker using the delay() method. send_thank_you_email.delay(form.cleaned_data["email"]) return redirect("/thank-you/") else: form = FeedbackForm() return render(request, "feedback.html", {"form": form})
Celery のデコレータを使用してタスクが定義されると、タスクに late() メソッドが追加されます。フォームが正常に保存された後、上の例では send_thank_you_email タスクが late() メソッドを呼び出していることがわかります。 late() が呼び出されると、send_thank_you_email タスクとそのデータが broker に送信され、そこに保存され、後で worker によって実行されます。その時点でユーザーはメールで送信されます。
フォームを保存した後に追加の電子メールを送信する必要がある場合、作業を Celery にプッシュする利点がより明らかになります。たとえば、新しいフィードバックを受け取ったことをカスタマー サポート チームに電子メールで送信することができます。 Celery を使用すると、応答に追加の時間がほとんど追加されません。
Celery タスクでは、追加の高度な構成も可能です。電子メールの送信に失敗した場合は、自動的に再試行するようにタスクをコーディングし、max_retries、retry_backoff、retry_jitter などの設定を構成できます。
Celery Enhancement Proposals の用語集には、メッセージ ブローカー:
について次のように記載されています。エンタープライズ統合パターンでは、メッセージ ブローカーを、複数の宛先からメッセージを受信し、正しい宛先を決定し、メッセージを正しいチャネルにルーティングできるアーキテクチャのビルディング ブロックとして定義します。
Celery での目的のために、ブローカーを、作成されたタスクが保存される「メッセージ トランスポート」とみなします。ブローカーは実際にタスクを実行しません。それはワーカーの仕事です。代わりに、ブローカーは、タスクがスケジュールされたときにスケジュールされたタスクが に保存され 、ワーカーが最終的にタスクを実行するときに からプルされる 場所です。ブローカーは Celery が動作するために必須のコンポーネントであり、Celery は 1 つのブローカーにのみ接続します。
Celery のバックエンドとブローカーのページには、サポートされているブローカーの一部がリストされています。また、リストされていないサポートされている実験的なブローカーもあります (SQLAlchemy など)。これらのブローカー (または「メッセージ トランスポート」) は、Celery が管理する Kombu と呼ばれるメッセージ トランスポート用の Python ライブラリによって管理されます。ブローカーの設定に関する情報を探す場合、Celery のドキュメントではなく Kombu のドキュメントを参照すると役立つ場合があります。
一部のブローカーはタスクのファンアウトや優先度などの高度な機能を備えていますが、他のブローカーは単純なキューとして動作します。
A worker は、ブローカーからタスクを取得し、Python アプリで定義されたタスク関数を実行する Celery のインスタンスです。 Celery 自体は Python で書かれているため、Celery はワーカー内で Python コードを実行できます。
多くのワーカーを同時に実行してタスクを実行できます。 celery ワーカー コマンドを実行すると、デフォルトでコンピューターのすべてのコアに対してワーカーが起動されます。コンピューターに 16 コアがある場合、celery ワーカーを実行すると 16 個のワーカーが開始されます。
実行中のワーカーがない場合、メッセージ (タスク) は、ワーカーが実行できるようになるまでブローカーに蓄積されます。
Celery ユーザー ガイドのタスク ページには、バックエンド:
について次のように記載されています。タスクを追跡したい場合、または戻り値が必要な場合、Celery は後で取得できるように状態をどこかに保存または送信する必要があります。 SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid (rpc)、および Redis から選択できる組み込みの結果バックエンドがいくつかあります。または独自に定義することもできます。
TLDR: バックエンドは、非同期タスクの結果と返された結果を追跡します。それは実際には何を意味しますか?いつ役立ちますか?
年次レポートを生成できる会計アプリを Django で構築していると想像してください。レポートの生成には数分かかる場合があります。
ユーザーの応答性を高めるには、AJAX リクエストを使用してレポート生成タスクを開始します。このリクエストはタスクの ID を返します。この ID を使用して、数秒ごとにサーバーをポーリングし、レポートが生成されたかどうかを確認できます。タスクが完了すると、レポートの ID が返されます。クライアントはこれを使用して、JavaScript 経由でレポートへのリンクを表示できます。
次のコードを使用して Celery と Django でこれを実装できます:
from celery import shared_task from django.http import JsonResponse from django.views.decorators.http import require_http_methods from accounting.models import Asset from accounting.reports import AnnualReportGenerator @shared_task def generate_report_task(year): # This could take minutes... report = AnnualReportGenerator().generate(year) asset = Asset.objects.create( name=f"{year} annual report", url=report.url, ) return asset.id @require_http_methods(["POST"]) def generate_annual_report_view(request): year = request.POST.get("year") task = generate_report_task.delay(year) return JsonResponse({"taskId": task.id}) def get_annual_report_generation_status_view(request, task_id): task = generate_report_task.AsyncResult(task_id) # The status is typically "PENDING", "SUCCESS", or "FAILURE" status = task.status return JsonResponse({"status": status, "assetId": task.result})
この例では、generate_report_task() によって返されたアセット ID は backend に保存されます。バックエンドは、結果と返された結果を保存します。バックエンドは、未処理のタスクのステータスを保存しません。これらは、結果があった場合にのみ追加されます。 「PENDING」を返すタスクのステータスはまったく不明です。関連するタスクが存在しない可能性もあります。通常、タスクは「SUCCESS」または「FAILURE」を返しますが、Celery ステータス ドキュメントですべてのステータスを確認できます。
Celery がタスクを実行するためにバックエンドは必要ありません。 ただし、タスクの結果を確認したり、タスクの結果を返す必要がある場合にはバックエンドが必要になります。 Celery にバックエンドが構成されていないときにタスクのステータスを確認しようとすると、例外が発生します。
この投稿が、セロリの個々の部分と、セロリの使用を検討する理由を理解するのに役立つことを願っています。公式ドキュメントを理解するのは難しいですが、Celery を深く学ぶことで、Python アプリケーション内で新たな可能性を開くことができます。
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3