Dans les 2 articles précédents, nous avons examiné Fanout et Fanin séparément. Il arrive souvent que nous les utilisions ensemble lorsque nous disposons d'un flux de données unique sur lequel nous souhaitons opérer sur les éléments individuellement et pouvons le faire en toute sécurité grâce à la concurrence. Ainsi, nous nous répartissons dans plusieurs threads de travail, puis nous revenons en un seul flux.
Par exemple, supposons que vous disposiez d'un fichier journal volumineux. Vous pouvez diviser le fichier en morceaux, permettant à chaque travailleur d'opérer simultanément sur une partie différente du fichier, puis combiner les résultats.
Si vous avez suivi les deux posts précédents, ce schéma est évident. Consultez les liens ci-dessus si vous n'êtes pas sûr.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iIl existe une fonction Produce() qui crée un flux d'entrée simulé de nombres.
Il existe une fonction de travail qui opère sur un canal d'entrée jusqu'à ce qu'il n'y ait plus de données. Sur chaque valeur, il « traite » les données d'entrée (détermine si la valeur est paire ou impaire), puis envoie une structure de résultat à un canal de sortie.
Notez que lorsque chaque travailleur a terminé, il ferme son canal de résultats. Ceci est nécessaire pour éviter un blocage, car sinon l'opération fanin resterait en veille en attendant plus de données sur le canal.
Le thread principal récupère le flux d'entrée de Produce, puis lance un certain nombre de travailleurs donnant à chaque travailleur son propre canal où il enverra ses résultats.
Ces canaux de résultats sont ensuite envoyés à l'opération fanin. Pour fanin, nous créons un canal pour recevoir la sortie, puis lançons une goroutine pour chacun des canaux de travail. Chaque goroutine parcourt simplement le canal jusqu'à ce qu'il n'y ait plus de données, puis se termine. Rappelez-vous que nous avons fermé le canal de résultat dans le thread de travail, c'est ce qui permet à la boucle for de se terminer
Notez que nous utilisons un WaitGroup pour le processus fanin. Cela nous permet de savoir quand tous les résultats de tous les canaux de résultats ont été combinés dans le canal de sortie. Lorsque cela se produit, nous fermons le canal de sortie afin que le thread en aval consommant la sortie puisse se terminer.
Avec toutes les données du canal de sortie, le thread principal peut continuer et afficher les résultats. Notez que nous utilisons un canal booléen pour empêcher le thread principal de se terminer jusqu'à ce que tout soit terminé ; sinon, cela mettra fin au processus.
Notez qu'il existe une autre façon d'effectuer une diffusion en utilisant une instruction select. La technique utilisée ici est un peu plus propre puisqu'on peut augmenter ou diminuer le nombre d'ouvriers.
Notez également que nous n'avons rien abordé concernant la résiliation anticipée de choses comme SIGTERM ou SIGINT. Cela ajoute un peu plus de complexité.
Comment mettriez-vous cela en œuvre ? Il existe d'autres implémentations du modèle fanout/fanin. Veuillez laisser vos commentaires et réflexions ci-dessous ?
Merci!
Le code de cet article et de tous les articles de cette série peut être trouvé ici
Clause de non-responsabilité: Toutes les ressources fournies proviennent en partie d'Internet. En cas de violation de vos droits d'auteur ou d'autres droits et intérêts, veuillez expliquer les raisons détaillées et fournir une preuve du droit d'auteur ou des droits et intérêts, puis l'envoyer à l'adresse e-mail : [email protected]. Nous nous en occuperons pour vous dans les plus brefs délais.
Copyright© 2022 湘ICP备2022001581号-3