Future 是一个容器,可以保存计算结果或计算期间发生的错误。创建 future 时,它以 PENDING 状态开始。该库不打算手动创建此对象,除非出于测试目的。
import concurrent.futures as futures f = futures.Future() assert(f._result is None) assert(f._exception is None) assert(f._state == 'PENDING')
PENDING 状态表示用户请求的计算已在线程池中注册并放入队列中,但尚未被任何线程拾取执行。一旦空闲线程从队列中获取任务(回调),Future 就会转换为 RUNNING 状态。 Future 只能在处于 PENDING 状态时被取消。因此,在 PENDING 和 RUNNING 状态之间存在一个时间窗口,在此期间可以取消请求的计算。
import concurrent.futures as futures def should_cancel_pending_future(): f = futures.Future() assert(f._state == 'PENDING') assert(f.cancel()) assert(f._state == 'CANCELLED') def should_not_cancel_running_future(): f = futures.Future() f.set_running_or_notify_cancel() assert(f._state == 'RUNNING') assert(not f.cancel()) def cancel_is_idempotent(): f = futures.Future() assert(f.cancel()) assert(f.cancel()) should_cancel_pending_future() should_not_cancel_running_future() cancel_is_idempotent()
线程池中请求的操作可以完成计算值或导致错误。无论结果如何,未来都会过渡到 FINISHED 状态。然后将结果或错误存储在相应的字段中。
import concurrent.futures as futures def future_completed_with_result(): f = futures.Future() f.set_result('foo') assert(f._state == 'FINISHED') assert(f._result == 'foo') assert(f._exception is None) def future_completed_with_exception(): f = futures.Future() f.set_exception(NameError()) assert(f._state == 'FINISHED') assert(f._result is None) assert(isinstance(f._exception, NameError)) future_completed_with_result() future_completed_with_exception()
要检索计算结果,请使用 result 方法。如果计算尚未完成,此方法将阻塞当前线程(从中调用结果),直到计算完成或等待超时。
如果计算成功完成且没有错误,则 result 方法返回计算值。
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_result('foo') threading.Thread(target=target).start() assert(f.result() == 'foo')
如果计算过程中发生异常,结果将引发该异常。
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_exception(NameError) threading.Thread(target=target).start() try: f.result() raise Exception() except NameError: assert(True)
如果方法在等待时超时,则会引发 TimeoutError。
import concurrent.futures as futures f = futures.Future() try: f.result(1) raise Exception() except TimeoutError: assert(f._result is None) assert(f._exception is None)
尝试获取已取消的计算结果将引发 CancelledError。
import concurrent.futures as futures f = futures.Future() assert(f.cancel()) try: f.result() raise Exception() except futures.CancelledError: assert(True)
在开发过程中,需要在一个线程池上运行N次计算并等待其完成是很常见的。为了实现这一点,该库提供了等待函数。等待策略有几种:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。
所有等待策略的共同点是,如果传递给 wait 方法的 future 已经完成,则无论选择何种策略,都会返回传递的 future 的集合。无论它们是如何完成的,无论是有错误、结果还是被取消,都无关紧要。
import concurrent.futures as futures def test(return_when): f1, f2, f3 = futures.Future(), futures.Future(), futures.Future() f1.cancel() f1.set_running_or_notify_cancel() # required f2.set_result('foo') f3.set_exception(NameError) r = futures.wait([f1, f2, f3], return_when=return_when) assert(len(r.done) == 3) assert(len(r.not_done) == 0) for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]: test(return_when)
ALL_COMPLETED 策略保证等待所有传递的 future 完成,或者在超时后退出,并收集截至该时刻完成的 future,这可能是不完整的。
import concurrent.futures as futures import threading import time def should_wait_for_all_futures_to_complete(): f1 = futures.Future() f1.set_result('foo') f2 = futures.Future() def target(): time.sleep(1) f2.set_result('bar') threading.Thread(target=target).start() r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED) assert(len(r.done) == 2) def should_exit_on_timeout(): f1 = futures.Future() f1.set_result('foo') f2 = futures.Future() r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED) assert(len(r.done) == 1) should_wait_for_all_futures_to_complete() should_exit_on_timeout()
FIRST_COMPLETED 策略保证返回至少有一个已完成的 future 的集合,或者在超时的情况下返回空集合。 此策略并不意味着返回的集合不能包含多个元素.
import concurrent.futures as futures import threading import time f1 = futures.Future() f2 = futures.Future() def target(): time.sleep(1) f1.set_result(True) threading.Thread(target=target).start() r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED) assert(len(r.done) == 1) assert(len(r.not_done) == 1)
如果其中一个计算完成时出现错误,FIRST_EXCEPTION 策略会中断等待。如果没有发生异常,则行为与 ALL_COMPLETED future 相同。
import concurrent.futures as futures import threading import time f1 = futures.Future() f1.set_result('foo') f2, f3 = futures.Future(), futures.Future() def target(): time.sleep(1) f2.set_exception(NameError()) threading.Thread(target=target).start() r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION) assert(len(r.done) == 2)
该对象负责创建线程池。与该对象交互的主要方法是 Submit 方法。它允许在线程池中注册计算。作为响应,返回一个 Future 对象,用于监控计算状态并获取最终结果。
特性
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3