Nas duas postagens anteriores, analisamos Fanout e Fanin separadamente. Muitas vezes, usamos eles juntos quando temos um único fluxo de dados onde queremos operar nos itens individualmente e podemos fazê-lo com segurança usando simultaneidade. Portanto, espalhamos vários threads de trabalho e depois voltamos para um único fluxo.
Por exemplo, suponha que você tenha um arquivo de log grande. Você pode dividir o arquivo em partes, permitindo que cada trabalhador opere em uma parte diferente do arquivo simultaneamente e depois combinar os resultados.
Se você seguiu os dois posts anteriores, esse padrão é óbvio. Veja os links acima se não tiver certeza.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iExiste uma função produzir() que cria um fluxo de entrada simulado de números.
Existe uma função de trabalho que opera em um canal de entrada até que não haja mais dados. Em cada valor, ele 'processa' os dados de entrada (determina se o valor é ímpar ou par) e, em seguida, envia uma estrutura de resultado para um canal de saída.
Observe que quando cada trabalhador termina, ele fecha seu canal de resultados. Isso é necessário para evitar impasses, pois, caso contrário, a operação fanin dormiria esperando por mais dados no canal.
O thread principal obtém o fluxo de entrada do produto e, em seguida, lança vários trabalhadores, dando a cada trabalhador seu próprio canal para onde enviará seus resultados.
Esses canais de resultados são então enviados para a operação fanin. Para fanin, criamos um canal para receber a saída e, em seguida, lançamos uma goroutine para cada um dos canais de trabalho. Cada goroutine simplesmente itera no canal até que não haja mais dados e então termina. Lembre-se de que fechamos o canal de resultados no thread de trabalho, é isso que permite que o loop for termine
Observe que usamos um WaitGroup para o processo fanin. Isso nos informa quando todos os resultados de todos os canais de resultados foram combinados no canal de saída. Quando isso acontece, fechamos o canal de saída para que qualquer thread downstream que esteja consumindo a saída possa ser encerrado.
Com todos os dados no canal de saída, o thread principal pode prosseguir e exibir os resultados. Observe que usamos um canal booleano para evitar que o thread principal termine até que tudo esteja pronto; caso contrário, o processo será encerrado.
Observe que há outra maneira de fazer fan-in usando uma instrução select. A técnica utilizada aqui é um pouco mais limpa, pois podemos aumentar ou diminuir o número de trabalhadores.
Observe também que não abordamos nada em relação à rescisão antecipada de coisas como SIGTERM ou SIGINT. Isso adiciona um pouco mais de complexidade.
Como você implementaria isso? Existem outras implementações do padrão fanout/fanin. Por favor, deixe seus comentários e opiniões abaixo?
Obrigado!
O código desta postagem e de todas as postagens desta série podem ser encontrados aqui
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3