In den beiden vorherigen Beiträgen haben wir Fanout und Fanin getrennt betrachtet. Es kommt häufig vor, dass wir sie zusammen verwenden, wenn wir über einen einzigen Datenstrom verfügen, in dem wir die Elemente einzeln bearbeiten möchten und dies sicher mithilfe der Parallelität tun können. Also fächern wir uns in mehrere Arbeitsthreads auf und fächern sie dann wieder in einen einzigen Stream auf.
Angenommen, Sie haben eine große Protokolldatei. Sie könnten die Datei in Teile aufteilen, sodass jeder Arbeiter gleichzeitig an einem anderen Teil der Datei arbeiten und dann die Ergebnisse kombinieren kann.
Wenn Sie den beiden vorherigen Beiträgen gefolgt sind, ist dieses Muster offensichtlich. Wenn Sie sich nicht sicher sind, sehen Sie sich die Links oben an.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iEs gibt eine Funktion Produce(), die einen simulierten Eingabestrom von Zahlen erstellt.
Es gibt eine Worker-Funktion, die auf einem Eingabekanal arbeitet, bis keine Daten mehr vorhanden sind. Bei jedem Wert „verarbeitet“ es die Eingabedaten (bestimmt, ob der Wert ungerade oder gerade ist) und sendet dann eine Ergebnisstruktur an einen Ausgabekanal.
Beachten Sie, dass jeder Worker seinen Ergebniskanal schließt, wenn er fertig ist. Dies ist notwendig, um einen Deadlock zu verhindern, da der Fanin-Vorgang andernfalls ruhen würde und auf weitere Daten auf dem Kanal warten würde.
Der Hauptthread ruft den Eingabestrom von Produce ab und startet dann eine Reihe von Workern, wobei jedem Worker ein eigener Kanal zugewiesen wird, an den er seine Ergebnisse sendet.
Diese Ergebniskanäle werden dann an die Fanin-Operation gesendet. Um Fanin zu erstellen, erstellen wir einen Kanal, um die Ausgabe zu empfangen, und starten dann eine Goroutine für jeden der Worker-Kanäle. Jede Goroutine durchläuft einfach den Kanal, bis keine Daten mehr vorhanden sind, und wird dann beendet. Denken Sie daran, dass wir den Ergebniskanal im Arbeitsthread geschlossen haben. Dadurch kann die for-Schleife beendet werden.
Beachten Sie, dass wir eine WaitGroup für den Fanin-Prozess verwenden. Dadurch erfahren wir, wann alle Ergebnisse aller Ergebniskanäle im Ausgabekanal zusammengefasst wurden. Wenn dies geschieht, schließen wir den Ausgabekanal, sodass jeder Downstream-Thread, der die Ausgabe verbraucht, beendet werden kann.
Mit allen Daten im Ausgabekanal kann der Hauptthread fortfahren und die Ergebnisse anzeigen. Beachten Sie, dass wir einen booleschen Kanal verwenden, um zu verhindern, dass der Hauptthread beendet wird, bis alles erledigt ist; andernfalls wird der Prozess beendet.
Beachten Sie, dass es eine andere Möglichkeit gibt, Fan-In mithilfe einer Select-Anweisung durchzuführen. Die hier verwendete Technik ist etwas sauberer, da wir die Anzahl der Arbeiter erhöhen oder verringern können.
Beachten Sie auch, dass wir uns nicht mit der vorzeitigen Beendigung durch Dinge wie SIGTERM oder SIGINT befasst haben. Das erhöht die Komplexität etwas.
Wie würden Sie das umsetzen? Es gibt andere Implementierungen des Fanout/Fanin-Musters. Bitte hinterlassen Sie unten Ihre Kommentare und Gedanken?
Danke!
Der Code für diesen Beitrag und alle Beiträge dieser Serie finden Sie hier
Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.
Copyright© 2022 湘ICP备2022001581号-3