如果您需要每 X 分钟/秒等运行一个函数来进行一些清理,触发一些操作,您可以在线程模块和 django 自定义 cli 的帮助下执行一个简单的调度程序命令。
假设我想每 5 秒调用一个函数以在外部 API 上发布一些内容。
在您的 django 应用程序中创建一个名为 management 的文件夹/包,在该文件夹中创建另一个名为commands 的文件夹。在命令文件夹中创建一个名为 runposter.py 的模块。最后你会得到类似这样的结构 yourapp/management/commands/runposter.py.
在此代码中,我们使用一个运行 while 循环的线程,只要它不每 5 秒停止一次。将 print("posting") 替换为您要运行的函数/逻辑。
# runposter.py import time from threading import Thread, Event from django.conf import settings from django.core.management.base import BaseCommand stop_event = Event() def my_job(): while not stop_event.is_set(): try: print("posting") time.sleep(5) except KeyboardInterrupt: break class Command(BaseCommand): help = "Run Poster." def handle(self, *args, **options): poster = Thread(target=my_job) try: print("Starting poster...") poster.start() while poster.is_alive(): poster.join(timeout=1) except KeyboardInterrupt: print("Stopping poster...") stop_event.set() poster.join() print("Poster shut down successfully!")
很好,现在打开另一个终端窗口并运行 python manage.py runposter。如您所见,命令 runposter 是根据我们给出的模块名称创建的。
当然,对于更复杂的东西,我建议使用 rq-scheduler 或 celery 周期性任务或 django-q。
但是,对于简单的情况,这应该足够好了。
免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。
Copyright© 2022 湘ICP备2022001581号-3