如果您需要每X 分鐘/秒等運行一個函數來進行一些清理,觸發一些操作,您可以在線程模組和django 自訂cli 的幫助下執行一個簡單的調度程序命令。
假設我想每 5 秒呼叫一個函數以在外部 API 上發布一些內容。
在您的 django 應用程式中建立一個名為 management 的資料夾/包,在該資料夾中建立另一個名為commands 的資料夾。在命令資料夾中建立一個名為 runposter.py 的模組。最後你會得到類似這樣的結構 yourapp/management/commands/runposter.py.
在此程式碼中,我們使用一個運行 while 循環的線程,只要它不每 5 秒停止一次。將 print("posting") 替換為您要執行的函數/邏輯。
# runposter.py import time from threading import Thread, Event from django.conf import settings from django.core.management.base import BaseCommand stop_event = Event() def my_job(): while not stop_event.is_set(): try: print("posting") time.sleep(5) except KeyboardInterrupt: break class Command(BaseCommand): help = "Run Poster." def handle(self, *args, **options): poster = Thread(target=my_job) try: print("Starting poster...") poster.start() while poster.is_alive(): poster.join(timeout=1) except KeyboardInterrupt: print("Stopping poster...") stop_event.set() poster.join() print("Poster shut down successfully!")
很好,現在打開另一個終端機視窗並運行 python manage.py runposter。如您所見,命令 runposter 是根據我們給出的模組名稱建立的。
當然,對於更複雜的東西,我建議使用 rq-scheduler 或 celery 週期性任務或 django-q。
但是,對於簡單的情況,這應該足夠好了。
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3