"Si un trabajador quiere hacer bien su trabajo, primero debe afilar sus herramientas." - Confucio, "Las Analectas de Confucio. Lu Linggong"
Página delantera > Programación > Python concurrente.futuros

Python concurrente.futuros

Publicado el 2024-11-08
Navegar:514

python concurrent.futures

Futuro

Future es un contenedor que puede contener el resultado de un cálculo o un error que ocurrió durante ese cálculo. Cuando se crea un futuro, comienza en un estado PENDIENTE. La biblioteca no pretende que este objeto se cree manualmente, excepto quizás con fines de prueba.

import concurrent.futures as futures

f = futures.Future()
assert(f._result is None)
assert(f._exception is None)
assert(f._state == 'PENDING')

El estado PENDIENTE indica que un cálculo solicitado por el usuario se registró en el grupo de subprocesos y se colocó en una cola, pero aún no ha sido recogido por ningún subproceso para su ejecución. Una vez que un subproceso libre toma la tarea (devolución de llamada) de la cola, el futuro pasa al estado EN EJECUCIÓN. Un futuro sólo se puede cancelar mientras esté en estado PENDIENTE. Por lo tanto, existe un período de tiempo entre los estados PENDIENTE y EN EJECUCIÓN durante el cual se puede cancelar el cálculo solicitado.

import concurrent.futures as futures

def should_cancel_pending_future():
    f = futures.Future()
    assert(f._state == 'PENDING')
    assert(f.cancel())
    assert(f._state == 'CANCELLED')

def should_not_cancel_running_future():
    f = futures.Future()
    f.set_running_or_notify_cancel()
    assert(f._state == 'RUNNING')
    assert(not f.cancel())

def cancel_is_idempotent():
    f = futures.Future()
    assert(f.cancel())
    assert(f.cancel())


should_cancel_pending_future()
should_not_cancel_running_future()
cancel_is_idempotent()

Una operación solicitada en el grupo de subprocesos puede completarse con un valor calculado o generar un error. Independientemente del resultado, el futuro pasa al estado FINALIZADO. El resultado o error luego se almacena en los campos correspondientes.

import concurrent.futures as futures

def future_completed_with_result():
    f = futures.Future()
    f.set_result('foo')
    assert(f._state == 'FINISHED')
    assert(f._result == 'foo')
    assert(f._exception is None)

def future_completed_with_exception():
    f = futures.Future()
    f.set_exception(NameError())
    assert(f._state == 'FINISHED')
    assert(f._result is None)
    assert(isinstance(f._exception, NameError))

future_completed_with_result()
future_completed_with_exception()

Para recuperar el resultado de un cálculo, se utiliza el método de resultado. Si el cálculo aún no se ha completado, este método bloqueará el hilo actual (desde cuyo resultado se llamó) hasta que finalice el cálculo o se agote el tiempo de espera.

Si el cálculo se completa exitosamente sin errores, el método de resultado devuelve el valor calculado.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_result('foo')
threading.Thread(target=target).start()
assert(f.result() == 'foo')

Si se produjo una excepción durante el cálculo, el resultado generará esa excepción.

import concurrent.futures as futures
import time
import threading

f = futures.Future()
def target():
    time.sleep(1)
    f.set_exception(NameError)
threading.Thread(target=target).start()
try:
    f.result()
    raise Exception()
except NameError:
    assert(True)

Si el método caduca mientras se espera, se genera un TimeoutError.

import concurrent.futures as futures

f = futures.Future()
try:
    f.result(1)
    raise Exception()
except TimeoutError:
    assert(f._result is None)
    assert(f._exception is None)

El intento de obtener el resultado de un cálculo que fue cancelado generará un CancelledError.

import concurrent.futures as futures

f = futures.Future()
assert(f.cancel())
try:
    f.result()
    raise Exception()
except futures.CancelledError:
    assert(True)

estrategia de espera

En el proceso de desarrollo, es bastante común necesitar ejecutar N cálculos en un grupo de subprocesos y esperar a que se completen. Para lograr esto, la biblioteca proporciona la función de espera. Hay varias estrategias de espera: FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED.

Lo común a todas las estrategias de espera es que si los futuros pasados ​​al método de espera ya están completos, la colección de los futuros pasados ​​se devuelve independientemente de la estrategia elegida. No importa cómo se completaron, ya sea con un error, un resultado o si fueron cancelados.

import concurrent.futures as futures

def test(return_when):
    f1, f2, f3 = futures.Future(), futures.Future(), futures.Future()
    f1.cancel()
    f1.set_running_or_notify_cancel() # required
    f2.set_result('foo')
    f3.set_exception(NameError)

    r = futures.wait([f1, f2, f3], return_when=return_when)
    assert(len(r.done) == 3)
    assert(len(r.not_done) == 0)

for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]:
    test(return_when)

Estrategia ALL_COMPLETED

La estrategia ALL_COMPLETED garantiza esperar a que se completen todos los futuros pasados, o salir después de un tiempo de espera con una colección de los futuros completados hasta ese momento, que pueden estar incompletos.

import concurrent.futures as futures
import threading
import time

def should_wait_for_all_futures_to_complete():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()

    def target():
        time.sleep(1)
        f2.set_result('bar')

    threading.Thread(target=target).start()
    r = futures.wait([f1, f2], return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 2)

def should_exit_on_timeout():
    f1 = futures.Future()
    f1.set_result('foo')
    f2 = futures.Future()
    r = futures.wait(fs=[f1, f2], timeout=1, return_when=futures.ALL_COMPLETED)
    assert(len(r.done) == 1)


should_wait_for_all_futures_to_complete()
should_exit_on_timeout()

PRIMERO_COMPLETADO

La estrategia FIRST_COMPLETED garantiza la devolución de una colección con al menos un futuro completo o una colección vacía en caso de que se agote el tiempo de espera. Esta estrategia NO implica que la colección devuelta no pueda contener múltiples elementos.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f2 = futures.Future()

def target():
    time.sleep(1)
    f1.set_result(True)

threading.Thread(target=target).start()

r = futures.wait([f1, f2], return_when=futures.FIRST_COMPLETED)
assert(len(r.done) == 1)
assert(len(r.not_done) == 1)

PRIMERA_EXCEPCIÓN

La estrategia FIRST_EXCEPTION interrumpe la espera si uno de los cálculos finaliza con un error. Si no se producen excepciones, el comportamiento es idéntico al futuro ALL_COMPLETED.

import concurrent.futures as futures
import threading
import time

f1 = futures.Future()
f1.set_result('foo')
f2, f3 = futures.Future(), futures.Future()

def target():
    time.sleep(1)
    f2.set_exception(NameError())

threading.Thread(target=target).start()

r = futures.wait(fs=[f1, f2, f3], return_when=futures.FIRST_EXCEPTION)
assert(len(r.done) == 2)

ThreadPoolEjecutor

El objeto es responsable de crear un grupo de subprocesos. El método principal para interactuar con este objeto es el método Enviar. Permite registrar un cálculo en el grupo de subprocesos. En respuesta, se devuelve un objeto Futuro, que se utiliza para monitorear el estado del cálculo y obtener el resultado final.

Propiedades

  • Los nuevos hilos se crean SÓLO según sea necesario:
    • Si hay al menos un hilo libre cuando se solicita un cálculo, no se crea ningún hilo nuevo
    • Si no hay hilos libres cuando se solicita un cálculo, se crea un nuevo hilo, siempre que no se haya alcanzado el límite de maxWorkers.
    • Si no hay subprocesos libres y se ha alcanzado el límite máximo de trabajadores, el cálculo se coloca en una cola y lo realizará el siguiente subproceso disponible
  • El número máximo de subprocesos asignados para las necesidades computacionales de forma predeterminada es igual al número de núcleos de procesador lógicos
  • Una vez creado, un hilo no se destruye, incluso en caso de carga baja
Declaración de liberación Este artículo se reproduce en: https://dev.to/mapogolions/python-concurrentfutures-5f4a?1 Si hay alguna infracción, comuníquese con [email protected] para eliminarla.
Último tutorial Más>

Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.

Copyright© 2022 湘ICP备2022001581号-3