Go Concurrency Patterns
Goroutines
The go
keyword is used to start a goroutine.
A goroutine is a lightweight, managed thread used by the Go runtime to run functions concurrently.
Unlike OS threads, which have a fixed stack size (often around 1MB), goroutines start with a very small stack, around 2KB, and can grow or shrink dynamically as needed. This makes it possible to run thousands or even millions of goroutines simultaneously, depending on the available memory.
Goroutines are sometimes compared to “green threads,” which are threads that are scheduled in user space rather than by the OS. The problem with green threads is that they may not leverage multiple CPU cores efficiently since they don’t interact directly with the OS’s scheduler.
Goroutines are similar to green threads in that they are scheduled by the Go runtime rather than the OS. However, they differ in a crucial way: Go uses a model called Mscheduling, where the Go runtime maps multiple goroutines (M) onto multiple OS threads (N). This allows the runtime to distribute goroutines across multiple CPU cores when possible, making Go’s concurrency model more efficient and scalable than traditional green threads.
⚠️ IMPORTANT
The following example uses atime.Sleep
to wait for the goroutine to finish.
This is done for simplicity. Do NOT use this approach.
I’ll explain alternative options afterwards.
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from a goroutine!")
}
func main() {
go sayHello() // Launches sayHello in a new goroutine
fmt.Println("Hello from main!")
time.Sleep(1 * time.Second) // Wait for the goroutine to complete (only for example)
}
https://play.golang.com/p/SdOdZ90-exI
⚠️ NOTE
In Go, themain
function is effectively the “initial” goroutine.
When a Go program starts, the Go runtime creates a goroutine to runmain
.
This main goroutine can then spawn additional goroutines as needed.
Channels
Channels in Go are a powerful way to communicate between goroutines and to synchronize them. They allow you to send and receive values across goroutines, and they help avoid race conditions by enabling safe data sharing.
⚠️ IMPORTANT
In the following example the<-ch
BLOCKS the main thread.
This can cause a deadlock if we don’t have a goroutine running to unblock it.
package main
import (
"fmt"
)
func sendMessage(ch chan string) {
ch <- "Hello from a goroutine!" // Send a message to the channel
}
func main() {
// Create a new channel of type string
ch := make(chan string)
// Start a goroutine to send a message
go sendMessage(ch)
// Receive the message from the channel
msg := <-ch
fmt.Println(msg) // Output: Hello from a goroutine!
}
https://play.golang.com/p/2Qn_NacVw-0
Select Statement
The select
statement is used to wait on multiple channel operations. It blocks
until one of its cases can proceed, which makes it essential for handling
multiple asynchronous tasks.
Use select
when you have multiple channels to listen to, and you want to
respond to whichever channel receives data first.
⚠️ NOTE
In the following example, the first goroutine uses atime.Sleep
.
This is to simulate the operation taking a long time.
It results in theselect
pulling a value from the second goroutine.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "result from ch1"
}()
go func() {
ch2 <- "result from ch2"
}()
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
}
}
https://play.golang.com/p/HXe-bZ__EEy
A common use case for select
is to timeout a potential deadlock:
⚠️ NOTE
In the following example we usetime.After
to cause a timeout.
package main
import (
"fmt"
"time"
)
func main() {
// Create an unbuffered channel
ch := make(chan string)
// Start a goroutine that simulates a delayed send
go func() {
time.Sleep(3 * time.Second) // Simulate a delay
ch <- "Hello from goroutine!" // Send a message after delay
}()
select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(2 * time.Second): // Timeout after 2 seconds
fmt.Println("Timeout! No message received.")
}
}
https://play.golang.com/p/6BMPeUKdqkg
Wait Groups
A sync.WaitGroup
waits for a collection of goroutines to finish. It helps
coordinate a group of goroutines and ensures the program waits until all of them
have completed before proceeding.
Use a WaitGroup
when you need to wait for multiple goroutines to finish before
moving on.
In this example, a sync.WaitGroup
is used to wait for three goroutines to
complete. Each goroutine represents a worker, and each one calls wg.Done()
to
signal that it’s finished. The main
function calls wg.Wait()
to block until
all workers are done.
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Decrement counter when goroutine completes
fmt.Printf("Worker %d starting\n", id)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := range 3 {
wg.Add(1) // Track each goroutine started
go worker(i, &wg)
}
// Wait for all goroutines to finish
wg.Wait()
fmt.Println("All workers done.")
}
https://play.golang.com/p/LhEdSQIPp1R
Mutex
Go’s sync.Mutex
provides mutual exclusion, allowing only one goroutine at a
time to access a critical section of code. While sync.RWMutex
is a variant
that allows multiple readers or a single writer but not both.
Use sync.Mutex
or sync.RWMutex
when you need fine-grained control over data
access and want to protect shared data from race conditions.
In the below example sync.Mutex
ensures that only one goroutine modifies
counter.value
at a time, preventing race conditions:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Final Counter:", counter.Value())
}
https://play.golang.com/p/VIbNkQaPfZI
⚠️ NOTE
The use ofdefer
in theIncrement()
method is redundant.
It’s more useful for long complex functions where errors can occur.
Here I should have just placed theUnlock()
call after thec.value++
.
Conditions
Go’s sync.Cond
is used for signaling between goroutines. It lets goroutines
wait until they are notified to continue, which is useful when one goroutine
needs to wait for a certain condition to be met by another goroutine.
Use sync.Cond
when you need goroutines to wait for certain conditions, such as
producer-consumer scenarios.
In the following example, cond.Wait()
blocks until cond.Signal()
is called.
It’s useful for waiting on complex conditions where other primitives like chan
may not be ideal:
⚠️ IMPORTANT
The call tocond.L.Lock()
in the main goroutine just beforefor !ready
is required, otherwise you’ll get the errorfatal error: sync: unlock of unlocked mutex
. This is becausecond.Wait()
expects the caller to hold the lock before callingWait()
(see this video for details). OnceWait()
returns, it reacquires the lock, ensuring the main goroutine can safely check ready and exit the loop.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
go func() {
time.Sleep(1 * time.Second)
cond.L.Lock()
ready = true
cond.L.Unlock()
cond.Signal() // Notify one waiting goroutine
}()
cond.L.Lock()
for !ready {
cond.Wait() // Wait until condition is met
}
fmt.Println("Ready is true, proceeding.")
cond.L.Unlock()
}
https://play.golang.com/p/n_txZaH7lPA
In the following example we have multiple worker goroutines waiting on a shared
condition to be “notified.” We’ll see how both .Signal()
and .Broadcast()
work when notifying waiting goroutines:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, cond *sync.Cond) {
cond.L.Lock() // Lock the condition
defer cond.L.Unlock()
fmt.Printf("Worker %d is waiting\n", id)
cond.Wait() // Wait for a signal or broadcast
fmt.Printf("Worker %d is proceeding\n", id)
}
func main() {
lock := &sync.Mutex{}
cond := sync.NewCond(lock)
// Start multiple worker goroutines that will wait on the condition
for i := range 3 {
go worker(i, cond)
}
// Allow time for all workers to start and wait
time.Sleep(1 * time.Second)
// Use Signal to wake up one goroutine
fmt.Println("Notifying one worker")
cond.Signal() // Notifies one waiting worker
time.Sleep(1 * time.Second)
// Use Broadcast to wake up all remaining goroutines
fmt.Println("Broadcasting to all remaining workers")
cond.Broadcast() // Notifies all remaining waiting workers
// Allow time for all goroutines to complete
time.Sleep(2 * time.Second)
fmt.Println("Main function exiting.")
}
https://play.golang.com/p/41ibtmUmKaN
Each worker goroutine locks the condition, calls cond.Wait()
, and then waits.
This releases the lock (as we now understand from the earlier IMPORTANT note,
see above if you missed it), allowing other goroutines to call Wait()
as well.
The cond.Signal()
call in the main
function wakes up one of the waiting
goroutines, allowing it to proceed.
After a short delay, cond.Broadcast()
wakes up all remaining waiting
goroutines, allowing them to proceed simultaneously.
This is useful for scenarios where multiple tasks need to wait for a common event or state change to proceed.
⚠️ IMPORTANT
Notifications are not ordered.
Any one of the waiting goroutines can be chosen to proceed first.
Broadcast ensures that all waiting goroutines eventually proceed.
Now you might be thinking “hmm, it looks like I could use channels instead and they’re more idiomatic”.
Well, here are some reasons for why you might need to choose sync.Cond
over
channels:
Fine-Grained Control:
sync.Cond
allows precise control over waiting and signaling, suitable for cases where specific conditions must be checked or managed.Broadcast Capability: Broadcasting to multiple goroutines is straightforward with
sync.Cond
, whereas channels require individual signaling, which can be inefficient.Reduced Complexity for State-Based Waiting:
sync.Cond
is ideal for situations where goroutines need to wait for specific conditions to be true, rather than for individual values or events passed through a channel.Avoiding Channel Overhead: Channels introduce buffering and management overhead, especially with many goroutines, whereas
sync.Cond
relies on a shared mutex with a direct wait/notify mechanism, which is often faster.
In summary, sync.Cond
is best suited for use cases that involve waiting for
and signaling conditions, especially when you need more control over
synchronization and when goroutines are reacting to shared state changes rather
than discrete message passing.
Atomic Operations
The sync/atomic
package provides low-level atomic operations on simple types
like integers and pointers, ensuring operations are performed atomically.
Use atomic operations when you need lock-free synchronization for counters or flags, but only for basic integer or pointer manipulations.
In the following example, atomic.AddInt32
safely increments counter
without
a lock, making it ideal for high-performance counters or flags:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int32
var wg sync.WaitGroup
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt32(&counter, 1)
}()
}
wg.Wait()
fmt.Println("Final Counter:", atomic.LoadInt32(&counter))
}
https://play.golang.com/p/qsRPoC4GPNv
Once
Go’s sync.Once
ensures that a function only executes once, even if multiple
goroutines attempt to run it.
Use sync.Once
when you need to perform a one-time initialization, such as
setting up a shared resource.
In the following example, even though multiple goroutines call
once.Do(initialize)
, initialize
only runs once. This is especially useful
for lazy initialization of global resources:
package main
import (
"fmt"
"sync"
)
var once sync.Once
func initialize() {
fmt.Println("Initializing...")
}
func main() {
var wg sync.WaitGroup
for range 3 {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(initialize)
}()
}
wg.Wait()
}
https://play.golang.com/p/5J1ApCPc1iU
Context
Go’s context.Context
is not a strict concurrency primitive but is widely used
to manage timeouts, cancellations, and deadlines across goroutines.
Use context.Context
to signal cancellation or control the lifespan of
goroutines, particularly in networked or long-running tasks.
In the following example, context.WithTimeout
creates a context that
automatically cancels after 1 second, which is useful for controlling tasks that
may hang or take too long:
package main
import (
"context"
"fmt"
"time"
)
func process(ctx context.Context) {
select {
case <-time.After(2 * time.Second): // use time.After to simulate slow operation
fmt.Println("Completed work")
case <-ctx.Done():
fmt.Println("Work cancelled")
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go process(ctx)
time.Sleep(2 * time.Second)
}
https://play.golang.com/p/diSmAp0SJkg
Map
The sync
package has a Map
type which you
will likely not need to use.
The Go authors even document it as such…
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
Real Examples
Below is a ‘real world’ example where we need to delete a bunch of keys from a data store.
The API that is provided, does not support bulk deleting of keys.
The API does provide an endpoint that lets us paginate the available keys, and we then need to stream that information as quickly as possible using a pool of goroutines coordinated with both channels and wait groups.
It’s a nice example because it brings together several different concurrency primitives (goroutines, channels, select, wait groups, atomic operations).
💡 TIP
Keep reading after the code snippet for a brief breakdown of what the code does.
const (
// PoolSize is the goroutine/thread-pool size.
// Each pool will take a 'key' from a channel and issue a DELETE request.
const PoolSize int = 100
// MaxErrors is the maximum number of errors we'll allow before
// stopping the goroutines from executing.
const MaxErrors int = 100
)
func DeleteAllKeys(storeID string, out io.Writer) error {
// Create a 'spinner' which helps visually update the user on the progress.
spinnerMessage := "Deleting keys"
var spinner text.Spinner
var err error
spinner, err = text.NewSpinner(out)
if err != nil {
return err
}
err = spinner.Start()
if err != nil {
return err
}
spinner.Message(spinnerMessage + "...")
// Create a keys paginator.
p := fastly.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
StoreID: storeID,
})
// Channel for tracking errors when deleting keys.
errorsCh := make(chan string, MaxErrors)
// Channel for tracking keys to delete.
keysCh := make(chan string, 1000) // this number correlates to the pagination
// page size defined by the API
var (
// Track the number of keys deleted.
deleteCount atomic.Uint64
// Track which keys failed to be deleted.
failedKeys []string
// This will help us wait for all goroutines to complete.
wg sync.WaitGroup
)
// We have two separate execution flows happening at once:
//
// 1. Pushing keys from pagination data into a key channel.
// 2. Pulling keys from key channel and issuing API DELETE call.
//
// We have a limit on the number of errors. Once that limit is reached we'll
// stop the second set of goroutines processing the delete operation.
wg.Add(1)
go func() {
defer wg.Done()
defer close(keysCh)
for p.Next() {
for _, key := range p.Keys() {
keysCh <- key
}
}
}()
// Limit the number of goroutines spun up to the specified pool size.
for range PoolSize {
wg.Add(1)
go func() {
defer wg.Done()
for key := range keysCh {
err := fastly.DeleteKVStoreKey(
&fastly.DeleteKVStoreKeyInput{StoreID: c.StoreID, Key: key},
)
if err != nil {
select {
case errorsCh <- key:
default:
return // channel is full (i.e. MaxErrors limit reached)
}
}
// Update the TUI (Terminal UI) to reflect the current number of
// deleted keys.
f := strconv.FormatUint(deleteCount.Add(1), 10)
spinner.Message(spinnerMessage + "..." + f)
}
}()
}
wg.Wait()
close(errorsCh)
for err := range errorsCh {
failedKeys = append(failedKeys, err)
}
spinnerMessage = "Deleted keys: " + strconv.FormatUint(deleteCount.Load(), 10)
if len(failedKeys) > 0 {
spinner.StopFailMessage(spinnerMessage)
err := spinner.StopFail()
if err != nil {
return fmt.Errorf("failed to stop spinner: %w", err)
}
return fmt.Errorf("failed to delete %d keys", len(failedKeys))
}
spinner.StopMessage(spinnerMessage)
if err := spinner.Stop(); err != nil {
return fmt.Errorf("failed to stop spinner: %w", err)
}
text.Success(out, "\nDeleted all keys from KV Store '%s'", c.StoreID)
return nil
}
So you can see we have multiple goroutines spun up (and we wait for them using
sync.WaitGroup
):
- The first goroutine is iterating over the pagination data and pushing data into a channel.
- The other goroutines (we have a limit of
PoolSize
) are pulling data from the channel and issuing key deletion API calls.
We also use the select
statement to control whether we stop the goroutines
processing the deletion operations. The way we do this is to define another
channel (errorsCh
) with a buffer size of MaxErrors
, and then every time we
get an error we push the error into that channel. If the channel becomes full
(which it will do eventually because there’s nothing pulling messages from the
errorsCh
channel), then the select
statement will fallthrough to its
default
block and we’ll return the goroutine (causing it to stop running)
The last interesting concurrency primitive we use is atomic.Uint64
for
accurately tracking the number of deleted keys. We use its Add()
method within
the goroutine to safely increment the counter, and then at the end of the
function we use its Load()
method to safely extract the final value.
UPDATE 2024.11.01
Below is a modification to the ‘real world’ example code.
The difference is in how errors are handled. In the original code we would stop processing the deletion of keys when an error threshold was reached. But in the following version we don’t want that to happen. Instead we want to keep processing errors, but that introduces a new challenge related to channel buffer size (e.g. we can’t set an infinite amount of memory for a channel) and so at some point the errors channel is going to get filled up and we need to decide what to do.
In this case we can realistically only drop errors. But we can alleviate that a
little bit by trying to pull messages out from the errors channel and appending
them to the failedKeys
slice concurrently. This will make space in the
channel’s buffer and so we can push more errors into it if they occur.
The problem with pulling errors out of the errors channel concurrently is we
need to range
over the channel, but that will cause a deadlock if we don’t
handle it correctly (because ranging over a channel only terminates the loop
when the channel is closed). So we need to introduce not one sync.WaitGroup
but two! See the code comments below for more details…
// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(
conn *gofastly.Client,
storeID string,
maxErrors, poolSize int
) error {
p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
StoreID: storeID,
})
errorsCh := make(chan string, maxErrors)
keysCh := make(chan string, 1000) // number correlates to pagination page size
var (
failedKeys []string
mu sync.Mutex
wgProcessing sync.WaitGroup
wgErrorCh sync.WaitGroup
)
// We have three separate execution flows happening at once:
//
// 1. Pushing keys from pagination data into a key channel.
// 2. Pulling keys from error channel and appending to failedKeys slice.
// 3. Pulling keys from key channel and issuing API DELETE call.
//
// The second item is problematic, in that ranging over a channel only
// terminates when the channel is closed. So we need to ensure we close the
// errorsCh once we've finished processing the deletion of all the keys.
//
// To do that we need two sets of wait groups.
//
// The first is wgProcessing which keeps track of all goroutines related to
// processing the pagination data (e.g. the goroutine ranging over the
// paginator keys, and the goroutine ranging over the keysCh as part of the
// poolSize loop).
//
// The second wait group is wgErrorCh which tracks when the first
// (wgProcessing) has completed and then closes errorsCh.
// The following goroutine finishes once all pagination keys have been
// processed.
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
defer close(keysCh)
for p.Next() {
for _, key := range p.Keys() {
keysCh <- key
}
}
}()
// The following goroutine finishes once the errorsCh is closed.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
for err := range errorsCh {
mu.Lock()
failedKeys = append(failedKeys, err)
mu.Unlock()
}
}()
// The following goroutines close once they've pulled all data from keysCh.
for i := 1; i <= poolSize; i++ {
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
for key := range keysCh {
err := conn.DeleteKVStoreKey(
&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key},
)
if err != nil {
select {
case errorsCh <- key:
default:
continue // the larger we make maxErrors
// the less likely we'll drop errors
// (obviously there's a memory trade-off to be made)
}
}
}
}()
}
// The following goroutine is closed once the 'processing' goroutines are
// finished.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
wgProcessing.Wait() // Wait for all deletion and pagination tasks.
close(errorsCh)
}()
// Wait for the error-handling goroutines to finish processing.
wgErrorCh.Wait()
if len(failedKeys) > 0 {
return fmt.Errorf("failed to delete %d keys", len(failedKeys))
}
return nil
}