«Если рабочий хочет хорошо выполнять свою работу, он должен сначала заточить свои инструменты» — Конфуций, «Аналитики Конфуция. Лу Лингун»
титульная страница > программирование > python concurrent.futures

python concurrent.futures

Опубликовано 8 ноября 2024 г.
Просматривать:642

python concurrent.futures

Будущее

Future — это контейнер, который может хранить либо результат вычисления, либо ошибку, произошедшую во время этого вычисления. Когда будущее создается, оно начинается в состоянии ОЖИДАНИЯ. Библиотека не предполагает создание этого объекта вручную, за исключением, возможно, целей тестирования.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

Состояние PENDING указывает, что запрошенное пользователем вычисление было зарегистрировано в пуле потоков и помещено в очередь, но еще не было выбрано ни одним потоком для выполнения. Как только свободный поток берет задачу (обратный вызов) из очереди, будущее переходит в состояние ВЫПОЛНЕНИЕ. Фьючерс можно отменить только тогда, когда он находится в состоянии ОЖИДАНИЕ. Следовательно, между состояниями ОЖИДАНИЕ и ВЫПОЛНЕНИЕ существует промежуток времени, в течение которого запрошенное вычисление может быть отменено.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

Запрошенная операция в пуле потоков может либо завершиться с вычисленным значением, либо привести к ошибке. Независимо от результата, будущее переходит в состояние ЗАВЕРШЕНО. Результат или ошибка затем сохраняется в соответствующих полях.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

Чтобы получить результат вычисления, используется метод result. Если вычисление еще не завершено, этот метод заблокирует текущий поток (из которого был вызван результат) до тех пор, пока вычисление не завершится или не истечет время ожидания.

Если вычисление завершается успешно и без ошибок, метод result возвращает вычисленное значение.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

Если во время вычислений возникло исключение, результат вызовет это исключение.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

Если время ожидания метода истекает, возникает ошибка TimeoutError.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

Попытка получить результат отмененного вычисления вызовет CancelledError.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

Стратегия ожидания

В процессе разработки довольно часто возникает необходимость выполнить N вычислений в пуле потоков и дождаться их завершения. Для этого в библиотеке предусмотрена функция ожидания. Существует несколько стратегий ожидания: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED.

Общим для всех стратегий ожидания является то, что если фьючерсы, переданные методу ожидания, уже завершены, коллекция переданных фьючерсов возвращается независимо от выбранной стратегии. Неважно, как они были завершены, с ошибкой, с результатом или были отменены.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED стратегия

Стратегия ALL_COMPLETED гарантирует ожидание завершения всех пройденных фьючерсов или выход по таймауту с коллекцией завершенных к этому моменту фьючерсов, которая может быть неполной.

import concurrent.futures as futures
import threading
import time

def should_wait_for_all_futures_to_complete():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()

    def target():
        time.sleep(1)
        f2.set_result('bar')

    threading.Thread(target=target).start()
    r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 2)

def should_exit_on_timeout():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()
    r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 1)


should_wait_for_all_futures_to_complete()
should_exit_on_timeout()

FIRST_COMPLETED

Стратегия FIRST_COMPLETED гарантирует возврат коллекции хотя бы с одним завершенным фьючерсом или пустой коллекции в случае таймаута. Эта стратегия НЕ подразумевает, что возвращаемая коллекция не может содержать несколько элементов.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f2 = futures.Future()

def target():
    time.sleep(1)
    f1.set_result(True)

threading.Thread(target=target).start()

r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)

ПЕРВОЕ_ИСКЛЮЧЕНИЕ

Стратегия FIRST_EXCEPTION прерывает ожидание, если одно из вычислений завершается с ошибкой. Если исключений не возникает, поведение идентично будущему ALL_COMPLETED.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()

def target():
    time.sleep(1)
    f2.set_exception(NameError())

threading.Thread(target=target).start()

r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)

ThreadPoolExecutor

Объект отвечает за создание пула потоков. Основным методом взаимодействия с этим объектом является метод Submit. Это позволяет зарегистрировать вычисление в пуле потоков. В ответ возвращается объект Future, который используется для мониторинга состояния вычислений и получения окончательного результата.

Характеристики

  • Новые темы создаются ТОЛЬКО по мере необходимости:
    • Если при запросе вычисления есть хотя бы один свободный поток, новый поток не создается
    • Если при запросе вычисления нет свободных потоков, создается новый поток при условии, что предел maxWorkers не достигнут.
    • Если свободных потоков нет и достигнут предел maxWorkers, вычисление помещается в очередь и будет выполнено следующим доступным потоком
  • Максимальное количество потоков, выделяемых для вычислительных нужд, по умолчанию равно количеству логических ядер процессора
  • После создания поток не уничтожается даже при низкой нагрузке
Заявление о выпуске Эта статья воспроизведена по адресу: https://dev.to/mapogolions/python-concurrentfutures-5f4a?1. Если есть какие-либо нарушения, свяжитесь с [email protected], чтобы удалить ее.
Последний учебник Более>

Изучайте китайский

Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.

Copyright© 2022 湘ICP备2022001581号-3