In the previous 2 posts, we have looked at Fanout and Fanin separately. It is often the case that we use them together where we have a single data stream where we want to operate on the items individually and can do so safely using concurrency. So, we fanout into multiple worker threads then fanin back into a single stream.
For example, suppose you have a large log file. You could break the file into chunks, allowing each worker to operate on a different part of the file concurrently, then combine the results.
If you followed the previous two post, this pattern is obvious. See the links above if you're not sure.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; iThere is a produce() function that creates a simulated input stream of numbers.
There is a worker function that operates on an input channel until there is no more data. On each value it 'processes' the input data (determines if the value is odd or even), then sends a result struct to an output channel.
Note that when each worker is done, it closes its result channel. This is necessary to prevent deadlock since the fanin operation would otherwise sleep waiting for more data on the chan.
The main thread gets the input stream from produce, then launches a number of workers giving each worker its own channel where it will send its results.
These result channels are then sent to the fanin operation. To fanin, we create a channel to receive the output, then launch a goroutine for each of the worker channels. Each goroutine simply iterates over the channel until there is no more data then terminates. Remember that we closed the result channel in the worker thread, that is what allows the for loop to terminate
Note that we use a WaitGroup for the fanin process. This let's us know when all the results from all the result channels have been combined into the output channel. When this happen, we close the output channel so that whatever downstream thread consuming the output can terminate.
With all the data in the output channel, the main thread can go ahead and display the results. Note that we use a boolean channel to prevent the main thread from terminating until everything is done; otherwise, it will terminate the process.
Note that there is another way to do fan-in using a select statement. The technique used here is a little cleaner since we can increase or decrease the number of workers.
Note also that we have not addressed anything with regard to early termination from things like SIGTERM or SIGINT. That adds a little more complexity.
How would you implement this? There are other implementations of the fanout/fanin pattern. Please leave your comments and thoughts below?
Thanks!
The code for this post and all posts in this series can be found here
Disclaimer: All resources provided are partly from the Internet. If there is any infringement of your copyright or other rights and interests, please explain the detailed reasons and provide proof of copyright or rights and interests and then send it to the email: [email protected] We will handle it for you as soon as possible.
Copyright© 2022 湘ICP备2022001581号-3