前の 2 つの投稿では、Fanout と Fanin を別々に見てきました。アイテムを個別に操作したい単一のデータ ストリームがあり、並行性を使用して安全に操作できる場合、それらを一緒に使用することがよくあります。したがって、複数のワーカー スレッドにファンアウトしてから、単一のストリームにファンインして戻します。
たとえば、大きなログ ファイルがあるとします。ファイルをいくつかのチャンクに分割し、各ワーカーがファイルの異なる部分を同時に操作して、その結果を結合できるようにすることができます。
前の 2 つの投稿に従った場合、このパターンは明らかです。よくわからない場合は、上のリンクを参照してください。
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iシミュレートされた数値の入力ストリームを作成するProduce()関数があります。
データがなくなるまで入力チャネル上で動作するワーカー関数があります。各値に対して入力データを「処理」し (値が奇数か偶数かを判断し)、結果の構造体を出力チャネルに送信します。
各ワーカーが完了すると、結果チャネルが閉じられることに注意してください。これは、デッドロックを防ぐために必要です。そうしないと、fanin 操作が chan 上のさらなるデータを待ってスリープ状態になってしまうからです。
メインスレッドは、プロデュースから入力ストリームを取得し、多数のワーカーを起動し、各ワーカーに結果を送信する独自のチャネルを与えます。
これらの結果チャネルは、fanin 操作に送信されます。 fanin するには、出力を受信するチャネルを作成し、各ワーカー チャネルに対して goroutine を起動します。各ゴルーチンは、データがなくなるまでチャネルを反復処理し、終了します。 ワーカー スレッドで結果チャネルを閉じたことを思い出してください。これにより、for ループが終了できるようになります
fanin プロセスには WaitGroup を使用することに注意してください。これにより、すべての結果チャネルからのすべての結果がいつ出力チャネルに結合されたかを知ることができます。これが発生した場合、出力を消費するダウンストリーム スレッドが終了できるように、出力チャネルを閉じます。
出力チャネルにすべてのデータがあれば、メインスレッドは続行して結果を表示できます。すべてが完了するまでメインスレッドが終了しないように、ブール値チャネルを使用していることに注意してください。それ以外の場合は、プロセスが終了します。
select ステートメントを使用してファンインを行う別の方法があることに注意してください。ここで使用されている手法は、ワーカーの数を増減できるため、少しすっきりしています。
SIGTERM や SIGINT などによる早期終了に関しては何も対処していないことにも注意してください。これにより、もう少し複雑になります。
これをどのように実装しますか? ファンアウト/ファンイン パターンの実装は他にもあります。コメントやご意見を以下に残してください?
ありがとう!
この投稿とこのシリーズのすべての投稿のコードはここにあります
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3