在先前的兩篇文章中,我們分別介紹了 Fanout 和 Fanin。通常情況下,我們將它們一起使用,因為我們有一個資料流,我們希望單獨對專案進行操作,並且可以使用並發安全地執行此操作。因此,我們扇出到多個工作線程,然後扇回到單一流。
例如,假設您有一個很大的日誌檔案。您可以將文件分成多個區塊,允許每個工作人員同時操作文件的不同部分,然後合併結果。
如果您遵循前兩篇文章,這種模式是顯而易見的。如果您不確定,請參閱上面的連結。
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i有一個 Produce() 函數可以建立模擬的數位輸入流。
有一個工作函數在輸入通道上運行,直到沒有更多資料為止。它對每個值「處理」輸入資料(確定該值是奇數還是偶數),然後將結果結構傳送到輸出通道。
請注意,當每個工作人員完成後,它會關閉其結果通道。這對於防止死鎖是必要的,因為否則 fanin 操作將休眠等待 chan 上的更多資料。
主執行緒從生產中獲取輸入流,然後啟動許多工作人員,為每個工作人員提供自己的通道,將其結果發送到該通道。
這些結果通道隨後被送到 fanin 操作。為了扇入,我們建立一個通道來接收輸出,然後為每個工作通道啟動一個 goroutine。每個 goroutine 只是在通道上進行迭代,直到沒有更多資料為止,然後終止。 請記住,我們關閉了工作線程中的結果通道,這就是允許 for 循環終止的原因
請注意,我們使用 WaitGroup 進行 fanin 過程。這讓我們知道所有結果通道的所有結果何時已組合到輸出通道中。當發生這種情況時,我們關閉輸出通道,以便任何消耗輸出的下游執行緒都可以終止。
有了輸出通道中的所有數據,主執行緒就可以繼續顯示結果了。請注意,我們使用布林通道來防止主執行緒在一切完成之前終止;否則,它將終止進程。
請注意,還有另一種使用 select 語句進行扇入的方法。這裡使用的技術更乾淨一些,因為我們可以增加或減少工人的數量。
另請注意,我們尚未解決有關 SIGTERM 或 SIGINT 等問題的提前終止的任何問題。這增加了一點複雜性。
您將如何實現這一點? 扇出/扇入模式還有其他實作。請在下面留下您的評論和想法?
謝謝!
這篇文章以及本系列所有文章的程式碼可以在這裡找到
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3