"일꾼이 일을 잘하려면 먼저 도구를 갈고 닦아야 한다." - 공자, 『논어』.
첫 장 > 프로그램 작성 > 파이썬 동시.미래

파이썬 동시.미래

2024-11-08에 게시됨
검색:968

python concurrent.futures

미래

Future는 계산 결과나 계산 중에 발생한 오류를 담을 수 있는 컨테이너입니다. future가 생성되면 PENDING 상태로 시작됩니다. 라이브러리는 테스트 목적을 제외하고는 이 객체를 수동으로 생성할 의도가 없습니다.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING 상태는 사용자가 요청한 계산이 스레드 풀에 등록되어 대기열에 배치되었지만 실행을 위해 아직 어떤 스레드에서도 선택되지 않았음을 나타냅니다. 자유 스레드가 대기열에서 작업(콜백)을 가져오면 미래는 RUNNING 상태로 전환됩니다. 미래는 PENDING 상태인 동안에만 취소할 수 있습니다. 따라서 요청된 계산을 취소할 수 있는 PENDING 상태와 RUNNING 상태 사이에 시간 간격이 있습니다.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

스레드 풀에서 요청된 작업은 계산된 값으로 완료되거나 오류가 발생할 수 있습니다. 결과에 관계없이 미래는 FINISHED 상태로 전환됩니다. 결과 또는 오류는 해당 필드에 저장됩니다.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

계산 결과를 검색하려면 결과 메소드가 사용됩니다. 계산이 아직 완료되지 않은 경우 이 메서드는 계산이 완료되거나 대기 시간이 초과될 때까지 현재 스레드(결과가 호출된 스레드)를 차단합니다.

계산이 오류 없이 성공적으로 완료되면 결과 메소드는 계산된 값을 반환합니다.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

계산 중에 예외가 발생하면 결과에서 해당 예외가 발생합니다.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

대기하는 동안 메서드가 시간 초과되면 TimeoutError가 발생합니다.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

취소된 계산 결과를 얻으려는 시도는 CancelledError를 발생시킵니다.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

대기 전략

개발 프로세스에서는 스레드 풀에서 N개의 계산을 실행하고 완료될 때까지 기다려야 하는 것이 매우 일반적입니다. 이를 위해 라이브러리는 대기 기능을 제공합니다. FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED 등 여러 가지 대기 전략이 있습니다.

모든 대기 전략의 공통점은 wait 메서드에 전달된 future가 이미 완료된 경우 선택한 전략에 관계없이 전달된 future 컬렉션이 반환된다는 것입니다. 오류, 결과, 취소 등 어떻게 완료되었는지는 중요하지 않습니다.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 전략

ALL_COMPLETED 전략은 전달된 모든 Future가 완료될 때까지 기다리거나, 불완전할 수 있는 해당 순간까지 완료된 Future 컬렉션을 사용하여 시간 초과 후 종료하는 것을 보장합니다.

import concurrent.futures as futures
import threading
import time

def should_wait_for_all_futures_to_complete():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()

    def target():
        time.sleep(1)
        f2.set_result('bar')

    threading.Thread(target=target).start()
    r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 2)

def should_exit_on_timeout():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()
    r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 1)


should_wait_for_all_futures_to_complete()
should_exit_on_timeout()

FIRST_COMPLETED

FIRST_COMPLETED 전략은 시간 초과 시 하나 이상의 완료된 future 또는 빈 컬렉션이 있는 컬렉션 반환을 보장합니다. 이 전략은 반환된 컬렉션이 여러 요소를 포함할 수 없다는 것을 의미하지 않습니다.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f2 = futures.Future()

def target():
    time.sleep(1)
    f1.set_result(True)

threading.Thread(target=target).start()

r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)

FIRST_EXCEPTION

FIRST_EXCEPTION 전략은 계산 중 하나가 오류로 끝나면 대기를 중단합니다. 예외가 발생하지 않으면 동작은 ALL_COMPLETED future와 동일합니다.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()

def target():
    time.sleep(1)
    f2.set_exception(NameError())

threading.Thread(target=target).start()

r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)

ThreadPoolExecutor

객체는 스레드 풀 생성을 담당합니다. 이 개체와 상호 작용하는 주요 방법은 제출 방법입니다. 스레드 풀에 계산을 등록할 수 있습니다. 이에 대한 응답으로 계산 상태를 모니터링하고 최종 결과를 얻는 데 사용되는 Future 개체가 반환됩니다.

속성

  • 새 스레드는 필요한 경우에만 생성됩니다.
    • 계산 요청 시 사용 가능한 스레드가 하나 이상 있으면 새 스레드가 생성되지 않습니다.
    • 계산 요청 시 사용 가능한 스레드가 없으면 maxWorkers 제한에 도달하지 않은 경우 새 스레드가 생성됩니다.
    • 사용 가능한 스레드가 없고 maxWorkers 제한에 도달한 경우 계산은 대기열에 배치되고 사용 가능한 다음 스레드에서 수행됩니다.
  • 기본적으로 계산 요구에 할당된 최대 스레드 수는 논리 프로세서 코어 수와 같습니다.
  • 한 번 생성된 스레드는 부하가 낮아도 파괴되지 않습니다.
릴리스 선언문 이 기사는 https://dev.to/mapogolions/python-concurrentfutures-5f4a?1에서 복제됩니다.1 침해 내용이 있는 경우, [email protected]에 연락하여 삭제하시기 바랍니다.
최신 튜토리얼 더>

부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.

Copyright© 2022 湘ICP备2022001581号-3