Advanced Go Concurrency Patterns
Go supports conncurrency in the language and runtime, not a library. This changes how you structure your programs.
- Basics Quick Review: Goroutines and Channels
- Ping-Pong Example
- Deadlock detection
- Panic dumps the stacks
- It’s easy to go, but how to stop?
- Feed Reader Example
- Find an RSS client
- Naive Implementation
- Solution
- Select and nil channels
- Improve loop further
- Implemented Subscribe
- Conclusion
- Links
Basics Quick Review: Goroutines and Channels
- Goroutines are independantly exuecuting functions in the same address space.
go f()
go g(1, 2)
- Channels are typed values that allow goroutines to synchronize the exchange information.
c := make(chan int)
go func() {
c <- 3
}()
n := <- c
Ping-Pong Example
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table //game over; grap the ball
}
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
Deadlock detection
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
//table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table //game over; grap the ball
}
Panic dumps the stacks
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table //game over; grap the ball
panic("show me the stacks")
}
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
It’s easy to go, but how to stop?
Long-lived programs need to clean up. Let’s look at how to write programs that handle communication, periodic events, and cancellation. The core is Go’s select statement: like a switch, but the decision is made based on the ability to communicate.
select {
case xc <- x:
//sent x on xc
case y := <-yc:
//received y from yc
}
Feed Reader Example
My favorite feed reader disappeared. I need a new one. Why not write one? Where do we start?
Find an RSS client
Searching godoc.org for “rss” turns up serveral hits, including one that provides:
//Fetch fetches Items for uri and returns the time when the next fetch should be attempted. On failure, Fetch returns an error.
func Fetch(uri string) (items []Item, next time.Time, err error)
type Item struct {
Title, Channel, GUID string // a subset of RSS fields
}
But I want a stream:
<-chan Item
And I want multiple subscriptions.
Naive Implementation
package main
import (
"fmt"
"time"
)
type Item struct {
Title, Channel, GUID string
}
// sub implements the Subscription interface
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // delivers items to the user
closed bool
err error
}
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
type Subscription interface {
Updates() <-chan Item //stream of Items
Close() error // shuts down the stream
loop()
}
func main() {
//Subscribe to some feeds, and create a merged update stream
merged := Merge(
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("googleblog.blogspot.com")),
Subscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
//Print the stream
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")
}
//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
//converts Fetchers to a stream
s := &sub{
fetcher: fetcher,
updates: make(chan Item), //for Updates
}
go s.loop()
return s
}
func Fetch(domain string) Fetcher {
// fetches Items from domain
}
func Merge(subs ...Subscription) Subscription {
//merges several streams
}
//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
return s.updates
}
func (s *sub) Close() error {
//make loop exit and find out about any error
s.closed = true
return s.err
}
// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
for {
//Bug 1: unsynchronized access to s.closed/s.err
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
//Bug 2: time.Sleep may keep loop runnig
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
//Bug 3: loop may block forever on s.updates
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
}
What does loop do?
- periodically call Fetch
- send fetched items on the Updates channel
- exit when close is called, reporting any error
Race Detector: go run -race naivemain.go
Solution
The cases interact via err, next, and pending.
No locks, no condition variables, no callbacks.
Change the body of loop to a select with three cases:
- Close was called
- It’s time to call Fetch
- send an item on s.updates
package main
import (
"fmt"
"time"
)
type Item struct {
Title, Channel, GUID string
}
// sub implements the Subscription interface
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // delivers items to the user
closing chan chan error
err error
}
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
type Subscription interface {
Updates() <-chan Item //stream of Items
Close() error // shuts down the stream
loop()
}
func main() {
//Subscribe to some feeds, and create a merged update stream
merged := Merge(
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("googleblog.blogspot.com")),
Subscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
//Print the stream
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")
}
//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
//converts Fetchers to a stream
s := &sub{
fetcher: fetcher,
updates: make(chan Item), //for Updates
}
go s.loop()
return s
}
func Fetch(domain string) Fetcher {
// fetches Items from domain
}
func Merge(subs ...Subscription) Subscription {
//merges several streams
}
//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
return s.updates
}
func (s *sub) Close() error {
//Close asks loop to exit and waits for a response
errc := make(chan error)
s.closing <- errc
return <-errc
}
// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates //enable send case
}
//Loop runs in its own goroutine. Select lets loop avoid blocking indefinitely in any one state. The cases interact via local state in loop.
select {
case errc := <-s.closing:
//loop handles Close by replying with the Fetch error and exiting.
//The service(loop) listens for requests on its channel(s.closing).
//The client(Close) sends a request on s.closing: exit and reply with the error.
//In this case, the only thing in the request is the reply channel.
errc <- err
close(s.updates) // tell receiver we're done
return
case <-startFetch:
//Schedule the next Fetch after some delay.
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
}
}
Bug fixed:
- Bug 1: unsynchronized access to s.closed and s.err
- Bug 2: time.Sleep may keep loop running
- Bug 3: loop may block forever sending on s.updates
Select and nil channels
Select never selects a blocking case.
func main() {
//Sends and receives on nil channels block.
a, b := make(chan string), make(chan string)
go func() { a<-"a" }()
go func() { b<-"b" }()
if rand.Intn(2) == 0 {
a = nil
fmt.Println("nil a")
}else {
b = nil
fmt.Println("nil b")
}
select {
case s := <-a :
fmt.Println("got", s)
case s := <-b :
fmt.Println("got", s)
}
}
Improve loop further
- Issue1: Fetch may return duplicates - Fix: Filter items before adding to pending.
- Issue2: Pending queue grows without bound - Fix: Disable fetch case when too much pending.(Could instead drop order items from the head of pending.)
- Issue3: Loop blocks on Fetch - Fix: Run Fetch asynchronously
package main
import (
"fmt"
"time"
)
type Item struct {
Title, Channel, GUID string
}
// sub implements the Subscription interface
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // delivers items to the user
closing chan chan error
err error
}
type fetchResult struct {
fetched []Item
next time.Time
err error
}
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
type Subscription interface {
Updates() <-chan Item //stream of Items
Close() error // shuts down the stream
loop()
}
func main() {
//Subscribe to some feeds, and create a merged update stream
merged := Merge(
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("googleblog.blogspot.com")),
Subscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
//Print the stream
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")
}
//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
//converts Fetchers to a stream
s := &sub{
fetcher: fetcher,
updates: make(chan Item), //for Updates
}
go s.loop()
return s
}
func Fetch(domain string) Fetcher {
// fetches Items from domain
}
func Merge(subs ...Subscription) Subscription {
//merges several streams
}
//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
return s.updates
}
func (s *sub) Close() error {
//Close asks loop to exit and waits for a response
errc := make(chan error)
s.closing <- errc
return <-errc
}
// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
const maxPending = 10
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
var seen = make(map[string]bool) //set of items.GUIDs
var fetchDone chan fetchResult //if non-nil, Fetch is running
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // enable fetch case
}
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates //enable send case
}
//Loop runs in its own goroutine. Select lets loop avoid blocking indefinitely in any one state. The cases interact via local state in loop.
select {
case errc := <-s.closing:
//loop handles Close by replying with the Fetch error and exiting.
//The service(loop) listens for requests on its channel(s.closing).
//The client(Close) sends a request on s.closing: exit and reply with the error.
//In this case, the only thing in the request is the reply channel.
errc <- err
close(s.updates) // tell receiver we're done
return
case <-startFetch:
//Schedule the next Fetch after some delay.
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
var fetched = result.fetched
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, fetched...)
seen[item.GUID] = true
}
}
case updates <- first:
pending = pending[1:]
}
}
}
Implemented Subscribe
Responsive. Cleans up. Easy to read and change.
Three techniques:
- for -select loop
- service channel, reply channels(chan chan error)
- nil channel in select cases
Conclusion
Concurrent programming can be tricky. Go makes it easier:
- Channels convey data, timer events, cancellation signals
- goroutines serialize access to local mutable state
- statck traces & deadlock detector
- race detector