「労働者が自分の仕事をうまくやりたいなら、まず自分の道具を研ぎ澄まさなければなりません。」 - 孔子、「論語。陸霊公」
表紙 > プログラミング > Pythonコンカレントフューチャーズ

Pythonコンカレントフューチャーズ

2024 年 11 月 8 日に公開
ブラウズ:199

python concurrent.futures

未来

Future は、計算の結果または計算中に発生したエラーのいずれかを保持できるコンテナーです。 Future が作成されると、PENDING 状態で開始されます。ライブラリは、おそらくテスト目的を除いて、このオブジェクトが手動で作成されることを意図していません。

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

PENDING ステータスは、ユーザーによって要求された計算がスレッド プールに登録され、キューに配置されていますが、実行のためにどのスレッドにもまだ取得されていないことを示します。空きスレッドがキューからタスク (コールバック) を取得すると、Future は RUNNING 状態に移行します。将来は PENDING 状態にある場合にのみキャンセルできます。したがって、PENDING 状態と RUNNING 状態の間には、要求された計算をキャンセルできる時間枠があります。

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

スレッド プールで要求された操作は、計算された値で完了するか、エラーが発生する可能性があります。結果に関係なく、フューチャーは FINISHED 状態に移行します。結果またはエラーは、対応するフィールドに保存されます。

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

計算結果を取得するには、result メソッドを使用します。計算がまだ完了していない場合、このメソッドは、計算が完了するか待機がタイムアウトになるまで、(結果が呼び出された) 現在のスレッドをブロックします。

計算がエラーなく正常に完了すると、result メソッドは計算された値を返します。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

計算中に例外が発生した場合、結果はその例外を発生させます。

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

待機中にメソッドがタイムアウトすると、TimeoutError が発生します。

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

キャンセルされた計算の結果を取得しようとすると、CancelledError が発生します。

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

待ち戦略

開発プロセスでは、スレッド プールで N 回の計算を実行し、その完了を待つ必要があることがよくあります。これを実現するために、ライブラリは wait 関数を提供します。いくつかの待機戦略があります: FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。

すべての待機戦略に共通するのは、wait メソッドに渡された先物がすでに完了している場合、選択された戦略に関係なく、渡された先物のコレクションが返されることです。エラー、結果、キャンセルなど、どのように完了したかは関係ありません。

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

ALL_COMPLETED 戦略

ALL_COMPLETED 戦略は、渡されたすべての Future の完了を待つこと、またはその時点までに完了した Future のコレクション (不完全な可能性があります) でタイムアウト後に終了することを保証します。

import concurrent.futures as futures
import threading
import time

def should_wait_for_all_futures_to_complete():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()

    def target():
        time.sleep(1)
        f2.set_result('bar')

    threading.Thread(target=target).start()
    r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 2)

def should_exit_on_timeout():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()
    r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 1)


should_wait_for_all_futures_to_complete()
should_exit_on_timeout()

FIRST_COMPLETED

FIRST_COMPLETED 戦略は、タイムアウトの場合に、少なくとも 1 つの完了したフューチャーを含むコレクション、または空のコレクションが返されることを保証します。 この戦略は、返されるコレクションに複数の要素を含めることができないことを意味するものではありません.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f2 = futures.Future()

def target():
    time.sleep(1)
    f1.set_result(True)

threading.Thread(target=target).start()

r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)

FIRST_EXCEPTION

FIRST_EXCEPTION 戦略は、計算の 1 つがエラーで終了した場合に待機を中断します。例外が発生しない場合、動作は ALL_COMPLETED future.
と同じです。

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()

def target():
    time.sleep(1)
    f2.set_exception(NameError())

threading.Thread(target=target).start()

r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)

スレッドプールエグゼキュータ

オブジェクトはスレッド プールの作成を担当します。このオブジェクトと対話するための主なメソッドは、Submit メソッドです。スレッドプールに計算を登録できます。応答として、Future オブジェクトが返されます。これは、計算のステータスを監視し、最終結果を取得するために使用されます。

プロパティ

  • 新しいスレッドは必要な場合にのみ作成されます:
    • 計算が要求されたときに少なくとも 1 つの空きスレッドがある場合、新しいスレッドは作成されません
    • 計算が要求されたときに空きスレッドがない場合は、maxWorkers の制限に達していなければ、新しいスレッドが作成されます。
    • 空きスレッドがなく、maxWorkers の制限に達した場合、計算はキューに入れられ、次に利用可能なスレッドが処理します
  • デフォルトでは、計算ニーズに割り当てられるスレッドの最大数は、論理プロセッサ コアの数と等しくなります
  • スレッドは一度作成されると、負荷が低い場合でも破棄されません
リリースステートメント この記事は次の場所に転載されています: https://dev.to/mapogolions/python-concurrentfutures-5f4a?1 侵害がある場合は、[email protected] に連絡して削除してください。
最新のチュートリアル もっと>

免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。

Copyright© 2022 湘ICP备2022001581号-3