- Worker pool pattern
Date: May 29, 2024
Worker pool pattern is one of the golang concurrency patterns |
- The worker pool pattern involves creating a group of worker goroutines to process tasks concurrently,
- limiting the number of simultaneous operations. This pattern is valuable when you have a large number of tasks to execute.
- Some examples of using the Worker Pool Pattern in Real-world Applications
- Handling incoming HTTP requests in a web server.
- Processing images concurrently.
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
// Process jobs from the jobs channel until it is closed
for job := range jobs {
fmt.Printf("Worker %d processing job-%d\n", id, job)
results <- job // Send the result of the job to the results channel
}
}
func main() {
numWorkers := 3
numJobs := 10
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start workers
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(workerId int) {
defer wg.Done()
worker(workerId, jobs, results)
}(i)
}
// Add jobs to the jobs channel
for i := 1; i <= numJobs; i++ {
jobs <- i
}
// close channel since all values sent to jobs channel
close(jobs)
// NOTE:
// Non-blocking Main Goroutine:
// By placing wg.Wait() and close(results) in a `separate` goroutine,
// you ensure that the main goroutine can continue to collect results
// while waiting for all workers to finish.
// Blocking Main Goroutine:
// The main goroutine blocks on wg.Wait() and waits for all workers
// to finish before proceeding to collect results.
// Possible Deadlock: If you put wg.Wait() and close(results) in the main goroutine,
// it will block on wg.Wait(), potentially causing a deadlock if the buffer in the results channel fills up.
// This is because workers will be trying to send results to a full results channel
// while the main goroutine is waiting on wg.Wait().
// Potential Deadlock Scenario
// If the main goroutine blocks on wg.Wait() and the results channel buffer fills up,
// the workers will be stuck trying to send results. The main goroutine, in turn,
// is stuck waiting for the workers to finish, creating a deadlock.
// Wait for all workers to finish and close the results channel
// when the counter reaches zero, wg.Wait() unblocks
// the counter reaches zero when `worker` finishes putting values from `jobs` to `results`
// since all is sent to `results` channel, let's close results channle here
go func() {
wg.Wait()
// close channel since all values sent to results channel
// because the fact that wg.Wait() is called when all wg.Done() is executed implies
// that `worker(workerID, jobs, results)` function's `results <- job * 2` operation is complete
// since all values sent to the `results` channel we can close `results` channel here!
close(results)
}()
// Process results from the results channel
for result := range results {
fmt.Printf("Reading result: job-%d\n", result)
}
}
The reason for creating a separate goroutine for wg.Wait()
and close(results)
is to ensure that the main goroutine does not block while waiting for all the worker goroutines to finish. Let's break down the process and explain the differences:
Understanding the Flow
-
Creating Channels and Starting Workers:
- Channels
jobs
andresults
are created with a buffer size ofnumJobs
. - Worker goroutines are started and each worker processes jobs from the
jobs
channel and sends the results to theresults
channel.
- Channels
-
Enqueuing Jobs:
- Jobs are enqueued into the
jobs
channel. - Once all jobs are enqueued, the
jobs
channel is closed to signal to the workers that there are no more jobs to process.
- Jobs are enqueued into the
-
Waiting for Workers to Finish:
- A separate goroutine is started to wait for all worker goroutines to finish (
wg.Wait()
). - After all workers are done, this goroutine closes the
results
channel.
- A separate goroutine is started to wait for all worker goroutines to finish (
-
Collecting Results:
- The main goroutine collects and processes results from the
results
channel until it is closed.
- The main goroutine collects and processes results from the
Why a Separate Goroutine for wg.Wait()
and close(results)
?
By placing wg.Wait()
and close(results)
in a separate goroutine, you ensure that the main goroutine can continue to collect results while waiting for all workers to finish. Here's what happens in both cases:
Case 1: Separate Goroutine for wg.Wait()
and close(results)
go func() {
wg.Wait()
close(results)
}()
- Non-blocking Main Goroutine: The main goroutine does not block and can continue to collect results from the
results
channel. - Graceful Completion: Once all workers are done, the
results
channel is closed, allowing the main goroutine to exit thefor
loop and terminate gracefully.
Case 2: wg.Wait()
and close(results)
in Main Goroutine
wg.Wait()
close(results)
- Blocking Main Goroutine: The main goroutine blocks on
wg.Wait()
and waits for all workers to finish before proceeding to collect results. - Possible Deadlock: If you put
wg.Wait()
andclose(results)
in the main goroutine, it will block onwg.Wait()
, potentially causing a deadlock if the buffer in theresults
channel fills up. This is because workers will be trying to send results to a fullresults
channel while the main goroutine is waiting onwg.Wait()
.
Potential Deadlock Scenario
If the main goroutine blocks on wg.Wait()
and the results
channel buffer fills up, the workers will be stuck trying to send results. The main goroutine, in turn, is stuck waiting for the workers to finish, creating a deadlock.
Conclusion
Using a separate goroutine for wg.Wait()
and close(results)
allows the main goroutine to continue processing results without blocking, preventing the risk of a deadlock and ensuring smooth program execution. This design pattern is commonly used in concurrent programming to decouple the coordination of goroutines from the main execution flow.