"إذا أراد العامل أن يؤدي عمله بشكل جيد، فعليه أولاً أن يشحذ أدواته." - كونفوشيوس، "مختارات كونفوشيوس. لو لينجونج"
الصفحة الأمامية > برمجة > بيثون المتزامنة. العقود الآجلة

بيثون المتزامنة. العقود الآجلة

تم النشر بتاريخ 2024-11-08
تصفح:840

python concurrent.futures

مستقبل

المستقبل عبارة عن حاوية يمكنها الاحتفاظ بنتيجة الحساب أو الخطأ الذي حدث أثناء هذا الحساب. عندما يتم إنشاء المستقبل، فإنه يبدأ في حالة انتظار. لا تنوي المكتبة إنشاء هذا الكائن يدويًا، باستثناء ربما لأغراض الاختبار.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

تشير الحالة "معلقة" إلى أن الحساب الذي طلبه المستخدم قد تم تسجيله في تجمع سلاسل الرسائل ووضعه في قائمة الانتظار، ولكن لم يتم انتقاؤه بعد بواسطة أي مؤشر ترابط للتنفيذ. بمجرد أن يأخذ مؤشر ترابط مجاني المهمة (رد الاتصال) من قائمة الانتظار، فإن التحولات المستقبلية إلى حالة التشغيل. لا يمكن إلغاء المستقبل إلا عندما يكون في حالة انتظار. لذلك، هناك فترة زمنية بين حالتي PENDING وRUNNING يمكن خلالها إلغاء الحساب المطلوب.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

يمكن أن تكتمل العملية المطلوبة في تجمع مؤشرات الترابط بقيمة محسوبة أو تؤدي إلى خطأ. بغض النظر عن النتيجة، فإن التحولات المستقبلية إلى الحالة النهائية. يتم بعد ذلك تخزين النتيجة أو الخطأ في الحقول المقابلة.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

لاسترداد نتيجة الحساب، يتم استخدام طريقة النتيجة. إذا لم يكتمل الحساب بعد، فستقوم هذه الطريقة بحظر مؤشر الترابط الحالي (الذي تم استدعاء النتيجة منه) حتى انتهاء الحساب أو انتهاء مهلة الانتظار.

إذا اكتمل الحساب بنجاح دون أخطاء، فإن طريقة النتيجة ترجع القيمة المحسوبة.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

إذا حدث استثناء أثناء الحساب، فستؤدي النتيجة إلى رفع هذا الاستثناء.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

إذا انتهت مهلة الطريقة أثناء الانتظار، فسيظهر خطأ انتهاء المهلة.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

ستؤدي محاولة الحصول على نتيجة عملية حسابية تم إلغاؤها إلى ظهور خطأ إلغاء.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

استراتيجية الانتظار

في عملية التطوير، من الشائع جدًا الحاجة إلى تشغيل حسابات N على تجمع مؤشرات الترابط وانتظار اكتمالها. لتحقيق ذلك، توفر المكتبة وظيفة الانتظار. هناك عدة إستراتيجيات انتظار: FIRST_COMPLETED، FIRST_EXCEPTION، ALL_COMPLETED.

من الشائع في جميع استراتيجيات الانتظار أنه إذا كانت العقود الآجلة التي تم تمريرها إلى طريقة الانتظار مكتملة بالفعل، فسيتم إرجاع مجموعة العقود الآجلة التي تم تمريرها بغض النظر عن الإستراتيجية المختارة. ولا يهم كيف تم إكمالها سواء بخطأ أو نتيجة أو إذا تم إلغاؤها.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

استراتيجية ALL_COMPLETED

تضمن استراتيجية ALL_COMPLETED انتظار اكتمال جميع العقود الآجلة التي تم تمريرها، أو الخروج بعد مهلة مع مجموعة من العقود الآجلة المكتملة حتى تلك اللحظة، والتي قد تكون غير مكتملة.

import concurrent.futures as futures
import threading
import time

def should_wait_for_all_futures_to_complete():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()

    def target():
        time.sleep(1)
        f2.set_result('bar')

    threading.Thread(target=target).start()
    r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 2)

def should_exit_on_timeout():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()
    r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 1)


should_wait_for_all_futures_to_complete()
should_exit_on_timeout()

أول_مكتمل

تضمن إستراتيجية FIRST_COMPLETED إرجاع مجموعة ذات مستقبل مكتمل واحد على الأقل أو مجموعة فارغة في حالة انتهاء المهلة. لا تعني هذه الإستراتيجية أن المجموعة التي تم إرجاعها لا يمكن أن تحتوي على عناصر متعددة.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f2 = futures.Future()

def target():
    time.sleep(1)
    f1.set_result(True)

threading.Thread(target=target).start()

r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)

أول_استثناء

استراتيجية FIRST_EXCEPTION تقاطع الانتظار إذا انتهت إحدى العمليات الحسابية بوجود خطأ. في حالة عدم حدوث أي استثناءات، يكون السلوك مطابقًا للمستقبل ALL_COMPLETED.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()

def target():
    time.sleep(1)
    f2.set_exception(NameError())

threading.Thread(target=target).start()

r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)

ThreadPoolExecutor

الكائن مسؤول عن إنشاء تجمع مؤشرات الترابط. الطريقة الرئيسية للتفاعل مع هذا الكائن هي طريقة الإرسال. يسمح بتسجيل حساب في تجمع مؤشرات الترابط. ردا على ذلك، يتم إرجاع كائن المستقبل، والذي يستخدم لمراقبة حالة الحساب والحصول على النتيجة النهائية.

ملكيات

  • يتم إنشاء المواضيع الجديدة حسب الحاجة فقط:
    • إذا كان هناك مؤشر ترابط مجاني واحد على الأقل عند طلب الحساب، فلن يتم إنشاء مؤشر ترابط جديد
    • إذا لم تكن هناك سلاسل رسائل مجانية عند طلب عملية حسابية، فسيتم إنشاء مؤشر ترابط جديد، بشرط عدم الوصول إلى الحد الأقصى لعدد العمال.
    • إذا لم تكن هناك سلاسل رسائل مجانية وتم الوصول إلى الحد الأقصى لعدد العمال، فسيتم وضع الحساب في قائمة انتظار وسيتم أخذه بواسطة مؤشر الترابط التالي المتاح
  • الحد الأقصى لعدد سلاسل العمليات المخصصة للاحتياجات الحسابية افتراضيًا يساوي عدد نوى المعالج المنطقي
  • بمجرد إنشائه، لا يتم إتلاف الخيط، حتى في حالة التحميل المنخفض
بيان الافراج تم نشر هذه المقالة على: https://dev.to/mapogolions/python-concurrentfutures-5f4a?1 إذا كان هناك أي انتهاك، فيرجى الاتصال بـ [email protected] لحذفه
أحدث البرنامج التعليمي أكثر>

تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.

Copyright© 2022 湘ICP备2022001581号-3