Go Concurrency Patterns

Goroutines

The go keyword is used to start a goroutine.

A goroutine is a lightweight, managed thread used by the Go runtime to run functions concurrently.

Unlike OS threads, which have a fixed stack size (often around 1MB), goroutines start with a very small stack, around 2KB, and can grow or shrink dynamically as needed. This makes it possible to run thousands or even millions of goroutines simultaneously, depending on the available memory.

Goroutines are sometimes compared to “green threads,” which are threads that are scheduled in user space rather than by the OS. The problem with green threads is that they may not leverage multiple CPU cores efficiently since they don’t interact directly with the OS’s scheduler.

Goroutines are similar to green threads in that they are scheduled by the Go runtime rather than the OS. However, they differ in a crucial way: Go uses a model called Mscheduling, where the Go runtime maps multiple goroutines (M) onto multiple OS threads (N). This allows the runtime to distribute goroutines across multiple CPU cores when possible, making Go’s concurrency model more efficient and scalable than traditional green threads.

⚠️ IMPORTANT
The following example uses a time.Sleep to wait for the goroutine to finish.
This is done for simplicity. Do NOT use this approach.
I’ll explain alternative options afterwards.

package main

import (
	"fmt"
	"time"
)

func sayHello() {
	fmt.Println("Hello from a goroutine!")
}

func main() {
	go sayHello() // Launches sayHello in a new goroutine

	fmt.Println("Hello from main!")
	time.Sleep(1 * time.Second) // Wait for the goroutine to complete (only for example)
}

https://play.golang.com/p/SdOdZ90-exI

⚠️ NOTE
In Go, the main function is effectively the “initial” goroutine.
When a Go program starts, the Go runtime creates a goroutine to run main.
This main goroutine can then spawn additional goroutines as needed.

Channels

Channels in Go are a powerful way to communicate between goroutines and to synchronize them. They allow you to send and receive values across goroutines, and they help avoid race conditions by enabling safe data sharing.

⚠️ IMPORTANT
In the following example the <-ch BLOCKS the main thread.
This can cause a deadlock if we don’t have a goroutine running to unblock it.

package main

import (
	"fmt"
)

func sendMessage(ch chan string) {
	ch <- "Hello from a goroutine!" // Send a message to the channel
}

func main() {
	// Create a new channel of type string
	ch := make(chan string)

	// Start a goroutine to send a message
	go sendMessage(ch)

	// Receive the message from the channel
	msg := <-ch
	fmt.Println(msg) // Output: Hello from a goroutine!
}

https://play.golang.com/p/2Qn_NacVw-0

Select Statement

The select statement is used to wait on multiple channel operations. It blocks until one of its cases can proceed, which makes it essential for handling multiple asynchronous tasks.

Use select when you have multiple channels to listen to, and you want to respond to whichever channel receives data first.

⚠️ NOTE
In the following example, the first goroutine uses a time.Sleep.
This is to simulate the operation taking a long time.
It results in the select pulling a value from the second goroutine.

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "result from ch1"
	}()

	go func() {
		ch2 <- "result from ch2"
	}()

	select {
	case msg1 := <-ch1:
		fmt.Println("Received:", msg1)
	case msg2 := <-ch2:
		fmt.Println("Received:", msg2)
	}
}

https://play.golang.com/p/HXe-bZ__EEy

A common use case for select is to timeout a potential deadlock:

⚠️ NOTE
In the following example we use time.After to cause a timeout.

package main

import (
	"fmt"
	"time"
)

func main() {
	// Create an unbuffered channel
	ch := make(chan string)

	// Start a goroutine that simulates a delayed send
	go func() {
		time.Sleep(3 * time.Second)   // Simulate a delay
		ch <- "Hello from goroutine!" // Send a message after delay
	}()

	select {
	case msg := <-ch:
		fmt.Println("Received:", msg)
	case <-time.After(2 * time.Second): // Timeout after 2 seconds
		fmt.Println("Timeout! No message received.")
	}
}

https://play.golang.com/p/6BMPeUKdqkg

Wait Groups

A sync.WaitGroup waits for a collection of goroutines to finish. It helps coordinate a group of goroutines and ensures the program waits until all of them have completed before proceeding.

Use a WaitGroup when you need to wait for multiple goroutines to finish before moving on.

In this example, a sync.WaitGroup is used to wait for three goroutines to complete. Each goroutine represents a worker, and each one calls wg.Done() to signal that it’s finished. The main function calls wg.Wait() to block until all workers are done.

package main

import (
	"fmt"
	"sync"
)

func worker(id int, wg *sync.WaitGroup) {
	defer wg.Done() // Decrement counter when goroutine completes
	fmt.Printf("Worker %d starting\n", id)
	fmt.Printf("Worker %d done\n", id)
}

func main() {
	var wg sync.WaitGroup

	for i := range 3 {
		wg.Add(1) // Track each goroutine started
		go worker(i, &wg)
	}

	// Wait for all goroutines to finish
	wg.Wait()

	fmt.Println("All workers done.")
}

https://play.golang.com/p/LhEdSQIPp1R

Mutex

Go’s sync.Mutex provides mutual exclusion, allowing only one goroutine at a time to access a critical section of code. While sync.RWMutex is a variant that allows multiple readers or a single writer but not both.

Use sync.Mutex or sync.RWMutex when you need fine-grained control over data access and want to protect shared data from race conditions.

In the below example sync.Mutex ensures that only one goroutine modifies counter.value at a time, preventing race conditions:

package main

import (
	"fmt"
	"sync"
)

type Counter struct {
	mu    sync.Mutex
	value int
}

func (c *Counter) Increment() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
}

func (c *Counter) Value() int {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.value
}

func main() {
	counter := &Counter{}
	var wg sync.WaitGroup

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter.Increment()
		}()
	}

	wg.Wait()
	
	fmt.Println("Final Counter:", counter.Value())
}

https://play.golang.com/p/VIbNkQaPfZI

⚠️ NOTE
The use of defer in the Increment() method is redundant.
It’s more useful for long complex functions where errors can occur.
Here I should have just placed the Unlock() call after the c.value++.

Conditions

Go’s sync.Cond is used for signaling between goroutines. It lets goroutines wait until they are notified to continue, which is useful when one goroutine needs to wait for a certain condition to be met by another goroutine.

Use sync.Cond when you need goroutines to wait for certain conditions, such as producer-consumer scenarios.

In the following example, cond.Wait() blocks until cond.Signal() is called. It’s useful for waiting on complex conditions where other primitives like chan may not be ideal:

⚠️ IMPORTANT
The call to cond.L.Lock() in the main goroutine just before for !ready is required, otherwise you’ll get the error fatal error: sync: unlock of unlocked mutex. This is because cond.Wait() expects the caller to hold the lock before calling Wait() (see this video for details). Once Wait() returns, it reacquires the lock, ensuring the main goroutine can safely check ready and exit the loop.

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var mu sync.Mutex
	cond := sync.NewCond(&mu)
	ready := false

	go func() {
		time.Sleep(1 * time.Second)
		cond.L.Lock()
		ready = true
		cond.L.Unlock()
		cond.Signal() // Notify one waiting goroutine
	}()

	cond.L.Lock()
	for !ready {
		cond.Wait() // Wait until condition is met
	}
	fmt.Println("Ready is true, proceeding.")
	cond.L.Unlock()
}

https://play.golang.com/p/n_txZaH7lPA

In the following example we have multiple worker goroutines waiting on a shared condition to be “notified.” We’ll see how both .Signal() and .Broadcast() work when notifying waiting goroutines:

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, cond *sync.Cond) {
	cond.L.Lock() // Lock the condition
	defer cond.L.Unlock()

	fmt.Printf("Worker %d is waiting\n", id)
	cond.Wait() // Wait for a signal or broadcast
	fmt.Printf("Worker %d is proceeding\n", id)
}

func main() {
	lock := &sync.Mutex{}
	cond := sync.NewCond(lock)

	// Start multiple worker goroutines that will wait on the condition
	for i := range 3 {
		go worker(i, cond)
	}

	// Allow time for all workers to start and wait
	time.Sleep(1 * time.Second)

	// Use Signal to wake up one goroutine
	fmt.Println("Notifying one worker")
	cond.Signal() // Notifies one waiting worker
	time.Sleep(1 * time.Second)

	// Use Broadcast to wake up all remaining goroutines
	fmt.Println("Broadcasting to all remaining workers")
	cond.Broadcast() // Notifies all remaining waiting workers

	// Allow time for all goroutines to complete
	time.Sleep(2 * time.Second)
	fmt.Println("Main function exiting.")
}

https://play.golang.com/p/41ibtmUmKaN

Each worker goroutine locks the condition, calls cond.Wait(), and then waits. This releases the lock (as we now understand from the earlier IMPORTANT note, see above if you missed it), allowing other goroutines to call Wait() as well.

The cond.Signal() call in the main function wakes up one of the waiting goroutines, allowing it to proceed.

After a short delay, cond.Broadcast() wakes up all remaining waiting goroutines, allowing them to proceed simultaneously.

This is useful for scenarios where multiple tasks need to wait for a common event or state change to proceed.

⚠️ IMPORTANT
Notifications are not ordered.
Any one of the waiting goroutines can be chosen to proceed first.
Broadcast ensures that all waiting goroutines eventually proceed.

Now you might be thinking “hmm, it looks like I could use channels instead and they’re more idiomatic”.

Well, here are some reasons for why you might need to choose sync.Cond over channels:

  1. Fine-Grained Control: sync.Cond allows precise control over waiting and signaling, suitable for cases where specific conditions must be checked or managed.

  2. Broadcast Capability: Broadcasting to multiple goroutines is straightforward with sync.Cond, whereas channels require individual signaling, which can be inefficient.

  3. Reduced Complexity for State-Based Waiting: sync.Cond is ideal for situations where goroutines need to wait for specific conditions to be true, rather than for individual values or events passed through a channel.

  4. Avoiding Channel Overhead: Channels introduce buffering and management overhead, especially with many goroutines, whereas sync.Cond relies on a shared mutex with a direct wait/notify mechanism, which is often faster.

In summary, sync.Cond is best suited for use cases that involve waiting for and signaling conditions, especially when you need more control over synchronization and when goroutines are reacting to shared state changes rather than discrete message passing.

Atomic Operations

The sync/atomic package provides low-level atomic operations on simple types like integers and pointers, ensuring operations are performed atomically.

Use atomic operations when you need lock-free synchronization for counters or flags, but only for basic integer or pointer manipulations.

In the following example, atomic.AddInt32 safely increments counter without a lock, making it ideal for high-performance counters or flags:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var counter int32
	var wg sync.WaitGroup

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			atomic.AddInt32(&counter, 1)
		}()
	}

	wg.Wait()
	
	fmt.Println("Final Counter:", atomic.LoadInt32(&counter))
}

https://play.golang.com/p/qsRPoC4GPNv

Once

Go’s sync.Once ensures that a function only executes once, even if multiple goroutines attempt to run it.

Use sync.Once when you need to perform a one-time initialization, such as setting up a shared resource.

In the following example, even though multiple goroutines call once.Do(initialize), initialize only runs once. This is especially useful for lazy initialization of global resources:

package main

import (
	"fmt"
	"sync"
)

var once sync.Once

func initialize() {
	fmt.Println("Initializing...")
}

func main() {
	var wg sync.WaitGroup
	for range 3 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(initialize)
		}()
	}
	wg.Wait()
}

https://play.golang.com/p/5J1ApCPc1iU

Context

Go’s context.Context is not a strict concurrency primitive but is widely used to manage timeouts, cancellations, and deadlines across goroutines.

Use context.Context to signal cancellation or control the lifespan of goroutines, particularly in networked or long-running tasks.

In the following example, context.WithTimeout creates a context that automatically cancels after 1 second, which is useful for controlling tasks that may hang or take too long:

package main

import (
	"context"
	"fmt"
	"time"
)

func process(ctx context.Context) {
	select {
	case <-time.After(2 * time.Second): // use time.After to simulate slow operation
		fmt.Println("Completed work")
	case <-ctx.Done():
		fmt.Println("Work cancelled")
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go process(ctx)

	time.Sleep(2 * time.Second)
}

https://play.golang.com/p/diSmAp0SJkg

Map

The sync package has a Map type which you will likely not need to use.

The Go authors even document it as such…

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

Real Examples

Below is a ‘real world’ example where we need to delete a bunch of keys from a data store.

The API that is provided, does not support bulk deleting of keys.

The API does provide an endpoint that lets us paginate the available keys, and we then need to stream that information as quickly as possible using a pool of goroutines coordinated with both channels and wait groups.

It’s a nice example because it brings together several different concurrency primitives (goroutines, channels, select, wait groups, atomic operations).

💡 TIP
Keep reading after the code snippet for a brief breakdown of what the code does.

const (
	// PoolSize is the goroutine/thread-pool size.
	// Each pool will take a 'key' from a channel and issue a DELETE request.
	const PoolSize int = 100

	// MaxErrors is the maximum number of errors we'll allow before
	// stopping the goroutines from executing.
	const MaxErrors int = 100
)

func DeleteAllKeys(storeID string, out io.Writer) error {
	// Create a 'spinner' which helps visually update the user on the progress.
	spinnerMessage := "Deleting keys"
	var spinner text.Spinner

	var err error
	spinner, err = text.NewSpinner(out)
	if err != nil {
		return err
	}
	err = spinner.Start()
	if err != nil {
		return err
	}
	spinner.Message(spinnerMessage + "...")

	// Create a keys paginator.
	p := fastly.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
		StoreID: storeID,
	})

	// Channel for tracking errors when deleting keys.
	errorsCh := make(chan string, MaxErrors)
	
	// Channel for tracking keys to delete.
	keysCh := make(chan string, 1000) // this number correlates to the pagination 
	                                  // page size defined by the API

	var (
		// Track the number of keys deleted.
		deleteCount atomic.Uint64
		
		// Track which keys failed to be deleted.
		failedKeys []string
		
		// This will help us wait for all goroutines to complete.
		wg sync.WaitGroup
	)

	// We have two separate execution flows happening at once:
	//
	// 1. Pushing keys from pagination data into a key channel.
	// 2. Pulling keys from key channel and issuing API DELETE call.
	//
	// We have a limit on the number of errors. Once that limit is reached we'll
	// stop the second set of goroutines processing the delete operation.

	wg.Add(1)
	go func() {
		defer wg.Done()
		defer close(keysCh)
		for p.Next() {
			for _, key := range p.Keys() {
				keysCh <- key
			}
		}
	}()

	// Limit the number of goroutines spun up to the specified pool size.
	for range PoolSize {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for key := range keysCh {
				err := fastly.DeleteKVStoreKey(
					&fastly.DeleteKVStoreKeyInput{StoreID: c.StoreID, Key: key},
				)
				if err != nil {
					select {
					case errorsCh <- key:
					default:
						return // channel is full (i.e. MaxErrors limit reached)
					}
				}
				// Update the TUI (Terminal UI) to reflect the current number of 
				// deleted keys.
				f := strconv.FormatUint(deleteCount.Add(1), 10)
				spinner.Message(spinnerMessage + "..." + f)
			}
		}()
	}

	wg.Wait()

	close(errorsCh)
	for err := range errorsCh {
		failedKeys = append(failedKeys, err)
	}

	spinnerMessage = "Deleted keys: " + strconv.FormatUint(deleteCount.Load(), 10)

	if len(failedKeys) > 0 {
		spinner.StopFailMessage(spinnerMessage)
		err := spinner.StopFail()
		if err != nil {
			return fmt.Errorf("failed to stop spinner: %w", err)
		}
		return fmt.Errorf("failed to delete %d keys", len(failedKeys))
	}

	spinner.StopMessage(spinnerMessage)
	if err := spinner.Stop(); err != nil {
		return fmt.Errorf("failed to stop spinner: %w", err)
	}

	text.Success(out, "\nDeleted all keys from KV Store '%s'", c.StoreID)
	return nil
}

So you can see we have multiple goroutines spun up (and we wait for them using sync.WaitGroup):

  • The first goroutine is iterating over the pagination data and pushing data into a channel.
  • The other goroutines (we have a limit of PoolSize) are pulling data from the channel and issuing key deletion API calls.

We also use the select statement to control whether we stop the goroutines processing the deletion operations. The way we do this is to define another channel (errorsCh) with a buffer size of MaxErrors, and then every time we get an error we push the error into that channel. If the channel becomes full (which it will do eventually because there’s nothing pulling messages from the errorsCh channel), then the select statement will fallthrough to its default block and we’ll return the goroutine (causing it to stop running)

The last interesting concurrency primitive we use is atomic.Uint64 for accurately tracking the number of deleted keys. We use its Add() method within the goroutine to safely increment the counter, and then at the end of the function we use its Load() method to safely extract the final value.

UPDATE 2024.11.01

Below is a modification to the ‘real world’ example code.

The difference is in how errors are handled. In the original code we would stop processing the deletion of keys when an error threshold was reached. But in the following version we don’t want that to happen. Instead we want to keep processing errors, but that introduces a new challenge related to channel buffer size (e.g. we can’t set an infinite amount of memory for a channel) and so at some point the errors channel is going to get filled up and we need to decide what to do.

In this case we can realistically only drop errors. But we can alleviate that a little bit by trying to pull messages out from the errors channel and appending them to the failedKeys slice concurrently. This will make space in the channel’s buffer and so we can push more errors into it if they occur.

The problem with pulling errors out of the errors channel concurrently is we need to range over the channel, but that will cause a deadlock if we don’t handle it correctly (because ranging over a channel only terminates the loop when the channel is closed). So we need to introduce not one sync.WaitGroup but two! See the code comments below for more details…

// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(
	conn *gofastly.Client, 
	storeID string, 
	maxErrors, poolSize int
) error {
	p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
		StoreID: storeID,
	})

	errorsCh := make(chan string, maxErrors)
	keysCh := make(chan string, 1000) // number correlates to pagination page size

	var (
		failedKeys   []string
		mu           sync.Mutex
		wgProcessing sync.WaitGroup
		wgErrorCh    sync.WaitGroup
	)

	// We have three separate execution flows happening at once:
	//
	// 1. Pushing keys from pagination data into a key channel.
	// 2. Pulling keys from error channel and appending to failedKeys slice.
	// 3. Pulling keys from key channel and issuing API DELETE call.
	//
	// The second item is problematic, in that ranging over a channel only
	// terminates when the channel is closed. So we need to ensure we close the
	// errorsCh once we've finished processing the deletion of all the keys.
	//
	// To do that we need two sets of wait groups.
	//
	// The first is wgProcessing which keeps track of all goroutines related to
	// processing the pagination data (e.g. the goroutine ranging over the
	// paginator keys, and the goroutine ranging over the keysCh as part of the
	// poolSize loop).
	//
	// The second wait group is wgErrorCh which tracks when the first
	// (wgProcessing) has completed and then closes errorsCh.

	// The following goroutine finishes once all pagination keys have been
	// processed.
	wgProcessing.Add(1)
	go func() {
		defer wgProcessing.Done()
		defer close(keysCh)
		for p.Next() {
			for _, key := range p.Keys() {
				keysCh <- key
			}
		}
	}()

	// The following goroutine finishes once the errorsCh is closed.
	wgErrorCh.Add(1)
	go func() {
		defer wgErrorCh.Done()
		for err := range errorsCh {
			mu.Lock()
			failedKeys = append(failedKeys, err)
			mu.Unlock()
		}
	}()

	// The following goroutines close once they've pulled all data from keysCh.
	for i := 1; i <= poolSize; i++ {
		wgProcessing.Add(1)
		go func() {
			defer wgProcessing.Done()
			for key := range keysCh {
				err := conn.DeleteKVStoreKey(
					&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key},
				)
				if err != nil {
					select {
					case errorsCh <- key:
					default:
						continue // the larger we make maxErrors 
						         // the less likely we'll drop errors 
						         // (obviously there's a memory trade-off to be made)
					}
				}
			}
		}()
	}

	// The following goroutine is closed once the 'processing' goroutines are
	// finished.
	wgErrorCh.Add(1)
	go func() {
		defer wgErrorCh.Done()
		wgProcessing.Wait() // Wait for all deletion and pagination tasks.
		close(errorsCh)
	}()

	// Wait for the error-handling goroutines to finish processing.
	wgErrorCh.Wait()

	if len(failedKeys) > 0 {
		return fmt.Errorf("failed to delete %d keys", len(failedKeys))
	}

	return nil
}

Reference Material