Future — это контейнер, который может хранить либо результат вычисления, либо ошибку, произошедшую во время этого вычисления. Когда будущее создается, оно начинается в состоянии ОЖИДАНИЯ. Библиотека не предполагает создание этого объекта вручную, за исключением, возможно, целей тестирования.
import concurrent.futures as futures f = futures.Future() assert(f._result is None) assert(f._exception is None) assert(f._state == 'PENDING')
Состояние PENDING указывает, что запрошенное пользователем вычисление было зарегистрировано в пуле потоков и помещено в очередь, но еще не было выбрано ни одним потоком для выполнения. Как только свободный поток берет задачу (обратный вызов) из очереди, будущее переходит в состояние ВЫПОЛНЕНИЕ. Фьючерс можно отменить только тогда, когда он находится в состоянии ОЖИДАНИЕ. Следовательно, между состояниями ОЖИДАНИЕ и ВЫПОЛНЕНИЕ существует промежуток времени, в течение которого запрошенное вычисление может быть отменено.
import concurrent.futures as futures def should_cancel_pending_future(): f = futures.Future() assert(f._state == 'PENDING') assert(f.cancel()) assert(f._state == 'CANCELLED') def should_not_cancel_running_future(): f = futures.Future() f.set_running_or_notify_cancel() assert(f._state == 'RUNNING') assert(not f.cancel()) def cancel_is_idempotent(): f = futures.Future() assert(f.cancel()) assert(f.cancel()) should_cancel_pending_future() should_not_cancel_running_future() cancel_is_idempotent()
Запрошенная операция в пуле потоков может либо завершиться с вычисленным значением, либо привести к ошибке. Независимо от результата, будущее переходит в состояние ЗАВЕРШЕНО. Результат или ошибка затем сохраняется в соответствующих полях.
import concurrent.futures as futures def future_completed_with_result(): f = futures.Future() f.set_result('foo') assert(f._state == 'FINISHED') assert(f._result == 'foo') assert(f._exception is None) def future_completed_with_exception(): f = futures.Future() f.set_exception(NameError()) assert(f._state == 'FINISHED') assert(f._result is None) assert(isinstance(f._exception, NameError)) future_completed_with_result() future_completed_with_exception()
Чтобы получить результат вычисления, используется метод result. Если вычисление еще не завершено, этот метод заблокирует текущий поток (из которого был вызван результат) до тех пор, пока вычисление не завершится или не истечет время ожидания.
Если вычисление завершается успешно и без ошибок, метод result возвращает вычисленное значение.
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_result('foo') threading.Thread(target=target).start() assert(f.result() == 'foo')
Если во время вычислений возникло исключение, результат вызовет это исключение.
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_exception(NameError) threading.Thread(target=target).start() try: f.result() raise Exception() except NameError: assert(True)
Если время ожидания метода истекает, возникает ошибка TimeoutError.
import concurrent.futures as futures f = futures.Future() try: f.result(1) raise Exception() except TimeoutError: assert(f._result is None) assert(f._exception is None)
Попытка получить результат отмененного вычисления вызовет CancelledError.
import concurrent.futures as futures f = futures.Future() assert(f.cancel()) try: f.result() raise Exception() except futures.CancelledError: assert(True)
В процессе разработки довольно часто возникает необходимость выполнить N вычислений в пуле потоков и дождаться их завершения. Для этого в библиотеке предусмотрена функция ожидания. Существует несколько стратегий ожидания: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED.
Общим для всех стратегий ожидания является то, что если фьючерсы, переданные методу ожидания, уже завершены, коллекция переданных фьючерсов возвращается независимо от выбранной стратегии. Неважно, как они были завершены, с ошибкой, с результатом или были отменены.
import concurrent.futures as futures def test(return_when): f1, f2, f3 = futures.Future(), futures.Future(), futures.Future() f1.cancel() f1.set_running_or_notify_cancel() # required f2.set_result('foo') f3.set_exception(NameError) r = futures.wait([f1, f2, f3], return_when=return_when) assert(len(r.done) == 3) assert(len(r.not_done) == 0) for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]: test(return_when)
Стратегия ALL_COMPLETED гарантирует ожидание завершения всех пройденных фьючерсов или выход по таймауту с коллекцией завершенных к этому моменту фьючерсов, которая может быть неполной.
import concurrent.futures as futures import threading import time def should_wait_for_all_futures_to_complete(): f1 = futures.Future() f1.set_result('foo') f2 = futures.Future() def target(): time.sleep(1) f2.set_result('bar') threading.Thread(target=target).start() r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED) assert(len(r.done) == 2) def should_exit_on_timeout(): f1 = futures.Future() f1.set_result('foo') f2 = futures.Future() r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED) assert(len(r.done) == 1) should_wait_for_all_futures_to_complete() should_exit_on_timeout()
Стратегия FIRST_COMPLETED гарантирует возврат коллекции хотя бы с одним завершенным фьючерсом или пустой коллекции в случае таймаута. Эта стратегия НЕ подразумевает, что возвращаемая коллекция не может содержать несколько элементов.
import concurrent.futures as futures import threading import time f1 = futures.Future() f2 = futures.Future() def target(): time.sleep(1) f1.set_result(True) threading.Thread(target=target).start() r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED) assert(len(r.done) == 1) assert(len(r.not_done) == 1)
Стратегия FIRST_EXCEPTION прерывает ожидание, если одно из вычислений завершается с ошибкой. Если исключений не возникает, поведение идентично будущему ALL_COMPLETED.
import concurrent.futures as futures import threading import time f1 = futures.Future() f1.set_result('foo') f2, f3 = futures.Future(), futures.Future() def target(): time.sleep(1) f2.set_exception(NameError()) threading.Thread(target=target).start() r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION) assert(len(r.done) == 2)
Объект отвечает за создание пула потоков. Основным методом взаимодействия с этим объектом является метод Submit. Это позволяет зарегистрировать вычисление в пуле потоков. В ответ возвращается объект Future, который используется для мониторинга состояния вычислений и получения окончательного результата.
Характеристики
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3