En las 2 publicaciones anteriores, analizamos Fanout y Fanin por separado. A menudo ocurre que los usamos juntos cuando tenemos un único flujo de datos en el que queremos operar los elementos individualmente y podemos hacerlo de forma segura mediante la concurrencia. Entonces, nos distribuimos en múltiples subprocesos de trabajo y luego volvemos a un solo flujo.
Por ejemplo, supongamos que tiene un archivo de registro grande. Puede dividir el archivo en partes, permitiendo que cada trabajador opere en una parte diferente del archivo al mismo tiempo y luego combinar los resultados.
Si seguiste las dos publicaciones anteriores, este patrón es obvio. Consulta los enlaces anteriores si no estás seguro.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iHay una función produce() que crea un flujo de entrada simulado de números.
Hay una función de trabajo que opera en un canal de entrada hasta que no hay más datos. En cada valor, "procesa" los datos de entrada (determina si el valor es par o impar) y luego envía una estructura de resultado a un canal de salida.
Tenga en cuenta que cuando cada trabajador termina, cierra su canal de resultados. Esto es necesario para evitar un punto muerto ya que, de lo contrario, la operación fanin se suspendería esperando más datos en el canal.
El hilo principal obtiene el flujo de entrada del producto y luego lanza una cantidad de trabajadores, dándole a cada trabajador su propio canal al que enviará sus resultados.
Estos canales de resultados luego se envían a la operación fanin. Para fanin, creamos un canal para recibir el resultado y luego lanzamos una rutina para cada uno de los canales de trabajo. Cada gorutina simplemente itera sobre el canal hasta que no hay más datos y luego termina. Recuerde que cerramos el canal de resultados en el hilo de trabajo, eso es lo que permite que termine el bucle for
Tenga en cuenta que utilizamos un WaitGroup para el proceso de fanin. Esto nos permitirá saber cuándo todos los resultados de todos los canales de resultados se han combinado en el canal de salida. Cuando esto sucede, cerramos el canal de salida para que cualquier hilo descendente que consuma la salida pueda terminar.
Con todos los datos en el canal de salida, el hilo principal puede continuar y mostrar los resultados. Tenga en cuenta que utilizamos un canal booleano para evitar que el hilo principal termine hasta que todo esté hecho; de lo contrario, finalizará el proceso.
Tenga en cuenta que hay otra forma de hacer fan-in usando una declaración de selección. La técnica utilizada aquí es un poco más limpia ya que podemos aumentar o disminuir el número de trabajadores.
Tenga en cuenta también que no hemos abordado nada con respecto a la terminación anticipada de cosas como SIGTERM o SIGINT. Eso añade un poco más de complejidad.
¿Cómo implementarías esto? Hay otras implementaciones del patrón fanout/fanin. ¿Deje sus comentarios y pensamientos a continuación?
¡Gracias!
El código de esta publicación y todas las publicaciones de esta serie se pueden encontrar aquí
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3