芹菜学习起来可能令人望而生畏。虽然它的文档很全面,但它倾向于跳过基础知识。
这篇文章将定义 Celery 中的四个主要概念,讨论 Celery 和 Kombu 之间的关系,并使用一些代码示例来说明 Celery 在实际应用程序中如何有用。这些示例将使用 Django Web 框架及其 @shared_task 装饰器,但这些概念也适用于 Flask、FastAPI 等。
您将很难在当前的 Celery 文档中找到一个位置来清楚地说明它所认为的经纪人或后端,但是通过足够的挖掘,您可以找到和推断定义。
以下是开始使用 Celery 之前您应该了解的概念。
A 任务是Celery将异步执行的一些工作(在这种情况下,这是“不立即”的一个奇特词)。在 Web 应用程序中,一项任务可能是在用户提交表单后发送电子邮件。发送电子邮件可能是一个需要多秒的操作,并且强制用户在重定向之前等待电子邮件发送可能会让应用程序感觉很慢。
任务是使用 Celery 中的装饰器定义的。下面我们使用 @shared_task 装饰器将 send_thank_you_email() 转换为可在 Submit_feedback() 表单提交处理程序中使用的 Celery 任务。
from config.celery import shared_task from django.core.mail import send_mail from django.shortcuts import render, redirect from feedback.forms import FeedbackForm @shared_task def send_thank_you_email(email_address): send_mail( "Thank you for your feedback!", "We appreciate your input.", "[email protected]", [email_address], ) def submit_feedback(request): if request.method == "POST": form = FeedbackForm(request.POST) if form.is_valid(): form.save() # Push the task to the broker using the delay() method. send_thank_you_email.delay(form.cleaned_data["email"]) return redirect("/thank-you/") else: form = FeedbackForm() return render(request, "feedback.html", {"form": form})
当在 Celery 中使用装饰器定义任务时,它会向任务添加一个delay() 方法。您可以看到在成功保存表单后,send_thank_you_email 任务调用了上例中的delay() 方法。当delay()被调用时,它会将send_thank_you_email任务及其数据发送到存储它的broker,稍后将由worker执行,此时用户将通过电子邮件发送。
如果您在保存表单后需要发送额外的电子邮件,那么将工作推送到 Celery 的好处就变得更加明显。例如,您可能想向客户支持团队发送电子邮件,告知他们收到了新的反馈。对于 Celery,这几乎不会增加响应时间。
Celery 任务还允许额外的高级配置。如果电子邮件发送失败,您可以对任务进行编码以自动重试并配置 max_retries、retry_backoff、retry_jitter 等设置。
Celery 增强提案的术语表对 消息代理有以下说法:
企业集成模式将消息代理定义为一个架构构建块,它可以从多个目的地接收消息,确定正确的目的地并将消息路由到正确的通道。
为了我们使用 Celery 的目的,我们将把 broker 视为存储创建的任务的“消息传输”。经纪人实际上并不执行任务:那是工人的工作。相反,代理是计划任务在计划任务时存储到,以及在工作人员最终执行任务时从拉出的地方。代理是 Celery 工作的必需的组件,Celery 将只连接到一个代理。
Celery 的后端和代理页面列出了一些其支持的代理,并且还有其支持但未列出的其他实验性代理(例如 SQLAlchemy)。这些代理(或“消息传输”)由 Celery 维护的 Python 消息传输库(称为 Kombu)管理。当寻找有关配置代理的信息时,查阅 Kombu 的文档而不是 Celery 的文档有时会很有帮助。
一些代理具有任务扇出和优先级等高级功能,而其他代理则作为简单队列运行。
A worker 是 Celery 的一个实例,它从代理中提取任务并执行 Python 应用程序中定义的任务函数。 Celery 能够在其工作线程中运行 Python 代码,因为 Celery 本身是用 Python 编写的。
许多worker可以同时运行来执行任务。当您运行 celery worker 命令时,默认情况下它会为计算机的每个核心启动一个工作进程。如果你的电脑有16核,运行celeryworker将启动16个worker。
如果没有工作人员正在运行,消息(任务)将在代理中累积,直到工作人员可以执行它们。
Celery 用户指南中的任务页面有以下关于 后端的内容:
如果您想跟踪任务或需要返回值,那么 Celery 必须将状态存储或发送到某处,以便以后可以检索它们。有多种内置结果后端可供选择:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid (rpc) 和 Redis – 或者您也可以定义自己的结果后端。
TLDR:后端跟踪异步任务的结果和返回结果。这实际上意味着什么,什么时候有用?
假设您正在 Django 中构建一个可以生成年度报告的会计应用程序。该报告可能需要几分钟才能生成。
为了给您的用户提供响应更快的体验,您可以使用 AJAX 请求来启动报告生成任务。该请求返回任务的 ID,它可以使用该 ID 每隔几秒轮询一次服务器以查看是否生成了报告。任务完成后,它将返回报告的 ID,客户端可以使用该 ID 通过 JavaScript 显示报告的链接。
我们可以使用 Celery 和 Django 使用以下代码来实现这一点:
from celery import shared_task from django.http import JsonResponse from django.views.decorators.http import require_http_methods from accounting.models import Asset from accounting.reports import AnnualReportGenerator @shared_task def generate_report_task(year): # This could take minutes... report = AnnualReportGenerator().generate(year) asset = Asset.objects.create( name=f"{year} annual report", url=report.url, ) return asset.id @require_http_methods(["POST"]) def generate_annual_report_view(request): year = request.POST.get("year") task = generate_report_task.delay(year) return JsonResponse({"taskId": task.id}) def get_annual_report_generation_status_view(request, task_id): task = generate_report_task.AsyncResult(task_id) # The status is typically "PENDING", "SUCCESS", or "FAILURE" status = task.status return JsonResponse({"status": status, "assetId": task.result})
在此示例中,generate_report_task()返回的资产ID存储在后端中。后端存储结果和返回结果。后端不存储尚未处理的任务的状态:只有在有结果后才会添加这些状态。返回“PENDING”的任务具有完全未知的状态:关联的任务甚至可能不存在。任务通常会返回“SUCCESS”或“FAILURE”,但您可以在 Celery 状态文档中查看所有状态。
Celery 运行任务不需要后端。 但是,如果您需要检查任务的结果或返回任务的结果,则需要后端。如果您在 Celery 未配置后端时尝试检查任务的状态,则会引发异常。
我希望这篇文章可以帮助您了解 Celery 的各个部分以及您可能考虑使用它的原因。虽然官方文档很难理解,但深入学习 Celery 可以在你的 Python 应用程序中释放新的可能性。
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3