"यदि कोई कर्मचारी अपना काम अच्छी तरह से करना चाहता है, तो उसे पहले अपने औजारों को तेज करना होगा।" - कन्फ्यूशियस, "द एनालेक्ट्स ऑफ कन्फ्यूशियस। लू लिंगगोंग"
मुखपृष्ठ > प्रोग्रामिंग > कस्टम Django कमांड के साथ अजवाइन श्रमिकों को स्वचालित रूप से पुनः लोड करें

कस्टम Django कमांड के साथ अजवाइन श्रमिकों को स्वचालित रूप से पुनः लोड करें

2024-08-05 को प्रकाशित
ब्राउज़ करें:145

Automatically reload Celery workers with a custom Django command

सेलेरी में पहले --autoreload ध्वज था जिसे हटा दिया गया है। हालाँकि, Django के मैनेजहोम रनसर्वर कमांड में स्वचालित रीलोडिंग अंतर्निहित है। सेलेरी वर्कर्स में स्वचालित पुनः लोडिंग की अनुपस्थिति एक भ्रमित करने वाला विकास अनुभव पैदा करती है: पायथन कोड को अपडेट करने से Django सर्वर वर्तमान कोड के साथ पुनः लोड हो जाता है, लेकिन सर्वर द्वारा फायर किए जाने वाले कोई भी कार्य सेलेरी वर्कर में पुराना कोड चलाएगा।

यह पोस्ट आपको दिखाएगी कि एक कस्टम मैनेज.पी रनवर्कर कमांड कैसे बनाया जाए जो विकास के दौरान सेलेरी श्रमिकों को स्वचालित रूप से पुनः लोड करता है। कमांड को रनसर्वर के आधार पर तैयार किया जाएगा, और हम देखेंगे कि Django की स्वचालित रीलोडिंग अंडर-द-हुड कैसे काम करती है।

इससे पहले कि हम शुरू करें

यह पोस्ट मानती है कि आपके पास सेलेरी वाला Django ऐप पहले से इंस्टॉल है (गाइड)। यह यह भी मानता है कि आपको Django में परियोजनाओं और अनुप्रयोगों के बीच अंतर की समझ है।

स्रोत कोड और दस्तावेज़ीकरण के सभी लिंक प्रकाशन के समय (जुलाई, 2024) Django और Celery के वर्तमान संस्करणों के लिए होंगे। यदि आप इसे सुदूर भविष्य में पढ़ रहे हैं, तो चीज़ें बदल गई होंगी।

अंत में, पोस्ट के उदाहरणों में मुख्य परियोजना निर्देशिका को my_project नाम दिया जाएगा।

समाधान: एक कस्टम कमांड

हम रनवर्कर नामक एक कस्टम मैनेज.पी कमांड बनाएंगे। क्योंकि Django अपने रनसेवर कमांड के माध्यम से स्वचालित रीलोडिंग प्रदान करता है, हम अपने कस्टम कमांड के आधार के रूप में रनसर्वर के स्रोत कोड का उपयोग करेंगे।

आप अपने किसी भी प्रोजेक्ट एप्लिकेशन के भीतर प्रबंधन/कमांड/निर्देशिका बनाकर Django में एक कमांड बना सकते हैं। एक बार निर्देशिकाएं बन जाने के बाद, आप उस कमांड के नाम के साथ एक पायथन फ़ाइल डाल सकते हैं जिसे आप उस निर्देशिका (दस्तावेज़) में बनाना चाहते हैं।

यह मानते हुए कि आपके प्रोजेक्ट में पोल ​​नामक एक एप्लिकेशन है, हम poll/management/commands/runworker.py पर एक फ़ाइल बनाएंगे और निम्नलिखित कोड जोड़ेंगे:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )

महत्वपूर्ण: my_project के सभी उदाहरणों को अपने Django प्रोजेक्ट के नाम से बदलना सुनिश्चित करें।

यदि आप इस कोड को कॉपी-पेस्ट करना चाहते हैं और अपनी प्रोग्रामिंग जारी रखना चाहते हैं, तो आप इस पोस्ट के बाकी हिस्सों को पढ़े बिना सुरक्षित रूप से यहां रुक सकते हैं। यह एक सुंदर समाधान है जो आपके Django और अजवाइन प्रोजेक्ट को विकसित करते समय आपकी अच्छी सेवा करेगा। हालाँकि, यदि आप इसके बारे में अधिक जानना चाहते हैं कि यह कैसे काम करता है तो पढ़ना जारी रखें।

यह कैसे काम करता है (वैकल्पिक)

इस कोड की पंक्ति-दर-पंक्ति समीक्षा करने के बजाय, मैं विषय के आधार पर सबसे दिलचस्प भागों पर चर्चा करूंगा। यदि आप पहले से ही Django कस्टम कमांड से परिचित नहीं हैं, तो आप आगे बढ़ने से पहले दस्तावेज़ों की समीक्षा करना चाह सकते हैं।

स्वचालित पुनः लोडिंग

यह हिस्सा सबसे जादुई लगता है। कमांड के हैंडल() विधि के मुख्य भाग के भीतर, Django के आंतरिक autoreload.run_with_reloader() के लिए एक कॉल है। यह एक कॉलबैक फ़ंक्शन को स्वीकार करता है जो प्रोजेक्ट में हर बार पायथन फ़ाइल बदलने पर निष्पादित होगा। वह वास्तव में कैसे काम करता है?

आइए autoreload.run_with_reloader() फ़ंक्शन के स्रोत कोड के सरलीकृत संस्करण पर एक नज़र डालें। सरलीकृत फ़ंक्शन अपने संचालन के बारे में स्पष्टता प्रदान करने के लिए कोड को फिर से लिखता है, इनलाइन करता है और हटाता है।

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)

जब मैनेज.पी रनवर्कर को कमांड लाइन में चलाया जाता है, तो यह पहले हैंडल() विधि को कॉल करेगा जो run_with_reloader() को कॉल करेगा।

run_with_reloader() के अंदर, यह यह देखने के लिए जांच करेगा कि RUN_MAIN नामक पर्यावरण चर का मान "सही" है या नहीं। जब फ़ंक्शन को पहली बार कॉल किया जाता है, तो RUN_MAIN का कोई मान नहीं होना चाहिए।

जब RUN_MAIN को "सही" पर सेट नहीं किया जाता है, तो run_with_reloader() एक लूप में प्रवेश करेगा। लूप के अंदर, यह एक उपप्रोसेस शुरू करेगा जो कि पारित किए गए manage.py [कमांड_नाम] को फिर से चलाता है, फिर उस उपप्रोसेस के बाहर निकलने की प्रतीक्षा करें। यदि उपप्रोसेस रिटर्न कोड 3 के साथ बाहर निकलता है, तो लूप का अगला पुनरावृत्ति एक नया उपप्रोसेस शुरू करेगा और प्रतीक्षा करेगा। लूप तब तक चलेगा जब तक कि कोई सबप्रोसेस एक निकास कोड नहीं लौटाता जो 3 नहीं है (या जब तक उपयोगकर्ता ctrl c के साथ बाहर नहीं निकलता)। एक बार जब इसे गैर-3 रिटर्न कोड मिल जाता है, तो यह प्रोग्राम से पूरी तरह बाहर निकल जाएगा।

उत्पन्न उपप्रक्रिया फिर से मैनेज.py कमांड चलाती है (हमारे मामले में मैनेज.py रनवर्कर), और फिर से कमांड run_with_reloader() को कॉल करेगी। इस बार, RUN_MAIN को "सही" पर सेट किया जाएगा क्योंकि कमांड एक उपप्रक्रिया में चल रहा है।

अब जब run_with_reloader() को पता है कि यह एक उपप्रक्रिया में है, तो इसे एक रीलोडर मिलेगा जो फ़ाइल परिवर्तनों पर नज़र रखता है, दिए गए कॉलबैक फ़ंक्शन को थ्रेड में रखता है, और इसे रीलोडर को पास करता है जो परिवर्तनों को देखना शुरू कर देता है।

जब एक पुनः लोडर फ़ाइल परिवर्तन का पता लगाता है, तो यह sys.exit(3) चलाता है। यह उपप्रक्रिया से बाहर निकलता है, जो उपप्रक्रिया को उत्पन्न करने वाले कोड से लूप के अगले पुनरावृत्ति को ट्रिगर करता है। बदले में, एक नई उपप्रक्रिया लॉन्च की जाती है जो कोड के अद्यतन संस्करण का उपयोग करती है।

सिस्टम जाँच एवं माइग्रेशन

डिफ़ॉल्ट रूप से, Django कमांड अपने हैंडल() विधि को चलाने से पहले सिस्टम जांच करते हैं। हालाँकि, रनसर्वर और हमारे कस्टम रनवर्कर कमांड के मामले में, हम इन्हें तब तक चलाना स्थगित करना चाहेंगे जब तक हम कॉलबैक के अंदर नहीं आ जाते जो हम run_with_reloader() को प्रदान करते हैं। हमारे मामले में, यह हमारी run_worker() विधि है। यह हमें टूटे हुए सिस्टम चेक को ठीक करते समय स्वचालित रीलोडिंग के साथ कमांड चलाने की अनुमति देता है।

सिस्टम जांच को स्थगित करने के लिए, require_system_checks विशेषता का मान एक खाली सूची पर सेट किया जाता है, और run_worker() के मुख्य भाग में self.check() को कॉल करके जांच की जाती है। रनसर्वर की तरह, हमारा कस्टम रनवर्कर कमांड भी यह देखने के लिए जांच करता है कि क्या सभी माइग्रेशन चलाए गए हैं, और यदि माइग्रेशन लंबित हैं तो यह एक चेतावनी प्रदर्शित करता है।

क्योंकि हम पहले से ही run_worker() विधि के भीतर Django की सिस्टम जांच कर रहे हैं, हम डुप्लिकेट कार्य को रोकने के लिए --स्किप-चेक ध्वज पास करके सेलेरी में सिस्टम जांच को अक्षम कर देते हैं।

सिस्टम जांच और माइग्रेशन से संबंधित सभी कोड सीधे रनसर्वर कमांड स्रोत कोड से हटा दिए गए थे।

celery_app.worker_main()

हमारा कार्यान्वयन सेलेरी को भुगतान करने के बजाय celery_app.worker_main() का उपयोग करके सीधे पायथन से सेलेरी वर्कर को लॉन्च करता है।

on_worker_init()

यह कोड तब निष्पादित होता है जब कार्यकर्ता प्रारंभ होता है, दिनांक और समय, Django संस्करण और छोड़ने का आदेश प्रदर्शित करता है। इसे रनसर्वर बूट होने पर प्रदर्शित होने वाली जानकारी के आधार पर तैयार किया गया है।

अन्य रनसर्वर बॉयलरप्लेट

निम्नलिखित पंक्तियों को भी रनसर्वर स्रोत से हटा दिया गया था:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_exception()

छांटने का स्तर

यदि डेवलपर कोड को संशोधित किए बिना सीएलआई से सेटिंग समायोजित करना चाहता है तो हमारे कस्टम कमांड में एक कॉन्फ़िगर करने योग्य लॉग स्तर है।

आगे बढ़ना

मैंने इस कार्यान्वयन को बनाने के लिए Django और Celery के स्रोत कोड पर विचार किया और इसे विस्तारित करने के कई अवसर हैं। आप सेलेरी के अधिक कार्यकर्ता तर्कों को स्वीकार करने के लिए कमांड को कॉन्फ़िगर कर सकते हैं। वैकल्पिक रूप से, आप एक कस्टम मैनेजहोम कमांड बना सकते हैं जो स्वचालित रूप से किसी भी शेल कमांड को पुनः लोड करता है जैसे डेविड ब्राउन ने इस गिस्ट में किया था।

यदि आपको यह उपयोगी लगता है, तो बेझिझक एक लाइक या एक टिप्पणी छोड़ें। पढ़ने के लिए धन्यवाद।

विज्ञप्ति वक्तव्य इस लेख को इस पर पुनर्मुद्रित किया गया है: https://dev.to/tylerlwsmith/automaticalticaltically-reload-celery-workers-with-a-custom-django-command-1ojl?1 यदि कोई उल्लंघन है, इसे हटाने के लिए।
नवीनतम ट्यूटोरियल अधिक>

चीनी भाषा का अध्ययन करें

अस्वीकरण: उपलब्ध कराए गए सभी संसाधन आंशिक रूप से इंटरनेट से हैं। यदि आपके कॉपीराइट या अन्य अधिकारों और हितों का कोई उल्लंघन होता है, तो कृपया विस्तृत कारण बताएं और कॉपीराइट या अधिकारों और हितों का प्रमाण प्रदान करें और फिर इसे ईमेल पर भेजें: [email protected] हम इसे आपके लिए यथाशीघ्र संभालेंगे।

Copyright© 2022 湘ICP备2022001581号-3