Advanced Go Concurrency Patterns

Go supports conncurrency in the language and runtime, not a library. This changes how you structure your programs.

  • Basics Quick Review: Goroutines and Channels
  • Ping-Pong Example
    • Deadlock detection
    • Panic dumps the stacks
    • It’s easy to go, but how to stop?
  • Feed Reader Example
    • Find an RSS client
    • Naive Implementation
    • Solution
      • Select and nil channels
    • Improve loop further
    • Implemented Subscribe
    • Conclusion
    • Links

Basics Quick Review: Goroutines and Channels

  • Goroutines are independantly exuecuting functions in the same address space.
go f()
go g(1, 2)
  • Channels are typed values that allow goroutines to synchronize the exchange information.
c := make(chan int)
go func() {
    c <- 3
}()
n := <- c

Ping-Pong Example

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // game on; toss the ball 
    time.Sleep(1 * time.Second)
    <-table //game over; grap the ball
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

Deadlock detection

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    //table <- new(Ball) // game on; toss the ball 
    time.Sleep(1 * time.Second)
    <-table //game over; grap the ball
}

Panic dumps the stacks

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // game on; toss the ball 
    time.Sleep(1 * time.Second)
    <-table //game over; grap the ball

    panic("show me the stacks")
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

It’s easy to go, but how to stop?

Long-lived programs need to clean up. Let’s look at how to write programs that handle communication, periodic events, and cancellation. The core is Go’s select statement: like a switch, but the decision is made based on the ability to communicate.

select {
case xc <- x:
    //sent x on xc
case y := <-yc:
    //received y from yc
}

Feed Reader Example

My favorite feed reader disappeared. I need a new one. Why not write one? Where do we start?

Find an RSS client

Searching godoc.org for “rss” turns up serveral hits, including one that provides:

//Fetch fetches Items for uri and returns the time when the next fetch should be attempted. On failure, Fetch returns an error.
func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct {
    Title, Channel, GUID string // a subset of RSS fields
}

But I want a stream:

<-chan Item

And I want multiple subscriptions.

Naive Implementation

package main

import (
    "fmt"
    "time"
)

type Item struct {
    Title, Channel, GUID string
}

// sub implements the Subscription interface
type sub struct {
    fetcher Fetcher   // fetches items
    updates chan Item // delivers items to the user
    closed  bool
    err     error
}

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

type Subscription interface {
    Updates() <-chan Item //stream of Items
    Close() error         // shuts down the stream
    loop()
}

func main() {
    //Subscribe to some feeds, and create a merged update stream
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // Close the subscriptions after some time
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    //Print the stream
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }
    panic("show me the stacks")
}

//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
    //converts Fetchers to a stream
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), //for Updates
    }
    go s.loop()
    return s
}

func Fetch(domain string) Fetcher {
    // fetches Items from domain
}

func Merge(subs ...Subscription) Subscription {
    //merges several streams
}

//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
    return s.updates
}

func (s *sub) Close() error {
    //make loop exit and find out about any error
    s.closed = true
    return s.err
}

// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
    for {
        //Bug 1: unsynchronized access to s.closed/s.err
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
            //Bug 2: time.Sleep may keep loop runnig
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            //Bug 3: loop may block forever on s.updates
            s.updates <- item
        }

        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }
}

What does loop do?

  • periodically call Fetch
  • send fetched items on the Updates channel
  • exit when close is called, reporting any error

Race Detector: go run -race naivemain.go

Solution

The cases interact via err, next, and pending.
No locks, no condition variables, no callbacks.

Change the body of loop to a select with three cases:

  • Close was called
  • It’s time to call Fetch
  • send an item on s.updates
package main

import (
    "fmt"
    "time"
)

type Item struct {
    Title, Channel, GUID string
}

// sub implements the Subscription interface
type sub struct {
    fetcher Fetcher   // fetches items
    updates chan Item // delivers items to the user
    closing chan chan error
    err     error
}

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

type Subscription interface {
    Updates() <-chan Item //stream of Items
    Close() error         // shuts down the stream
    loop()
}

func main() {
    //Subscribe to some feeds, and create a merged update stream
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // Close the subscriptions after some time
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    //Print the stream
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }
    panic("show me the stacks")
}

//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
    //converts Fetchers to a stream
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), //for Updates
    }
    go s.loop()
    return s
}

func Fetch(domain string) Fetcher {
    // fetches Items from domain

}

func Merge(subs ...Subscription) Subscription {
    //merges several streams
}

//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
    return s.updates
}

func (s *sub) Close() error {
    //Close asks loop to exit and waits for a response
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
    var pending []Item // appended by fetch; consumed by send
    var next time.Time // initially January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // initially 0 (no delay)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates //enable send case
        }
        //Loop runs in its own goroutine. Select lets loop avoid blocking indefinitely in any one state. The cases interact via local state in loop.
        select {
        case errc := <-s.closing:
            //loop handles Close by replying with the Fetch error and exiting.
            //The service(loop) listens for requests on its channel(s.closing).
            //The client(Close) sends a request on s.closing: exit and reply with the error.
            //In this case, the only thing in the request is the reply channel.
            errc <- err
            close(s.updates) // tell receiver we're done
            return

        case <-startFetch:
            //Schedule the next Fetch after some delay.
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }
    }
}

Bug fixed:

  • Bug 1: unsynchronized access to s.closed and s.err
  • Bug 2: time.Sleep may keep loop running
  • Bug 3: loop may block forever sending on s.updates
Select and nil channels

Select never selects a blocking case.

func main() {
    //Sends and receives on nil channels block. 
    a, b := make(chan string), make(chan string)
    go func() { a<-"a" }()
    go func() { b<-"b" }()
    if rand.Intn(2) == 0 {
        a = nil
        fmt.Println("nil a")
    }else {
        b = nil
        fmt.Println("nil b")
    }
    select {
    case s := <-a :
        fmt.Println("got", s)
    case s := <-b :
        fmt.Println("got", s)
    }
}

Improve loop further

  • Issue1: Fetch may return duplicates - Fix: Filter items before adding to pending.
  • Issue2: Pending queue grows without bound - Fix: Disable fetch case when too much pending.(Could instead drop order items from the head of pending.)
  • Issue3: Loop blocks on Fetch - Fix: Run Fetch asynchronously
package main

import (
    "fmt"
    "time"
)

type Item struct {
    Title, Channel, GUID string
}

// sub implements the Subscription interface
type sub struct {
    fetcher Fetcher   // fetches items
    updates chan Item // delivers items to the user
    closing chan chan error
    err     error
}

type fetchResult struct {
    fetched []Item
    next    time.Time
    err     error
}

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

type Subscription interface {
    Updates() <-chan Item //stream of Items
    Close() error         // shuts down the stream
    loop()
}

func main() {
    //Subscribe to some feeds, and create a merged update stream
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // Close the subscriptions after some time
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    //Print the stream
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }
    panic("show me the stacks")
}

//Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription {
    //converts Fetchers to a stream
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), //for Updates
    }
    go s.loop()
    return s
}

func Fetch(domain string) Fetcher {
    // fetches Items from domain

}

func Merge(subs ...Subscription) Subscription {
    //merges several streams
}

//Implementing Subscription: To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item {
    return s.updates
}

func (s *sub) Close() error {
    //Close asks loop to exit and waits for a response
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

// loop fetches items using s.fetcher and sends them on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {
    const maxPending = 10
    var pending []Item // appended by fetch; consumed by send
    var next time.Time // initially January 1, year 0
    var err error
    var seen = make(map[string]bool) //set of items.GUIDs
    var fetchDone chan fetchResult   //if non-nil, Fetch is running
    for {
        var fetchDelay time.Duration // initially 0 (no delay)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // enable fetch case
        }
        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates //enable send case
        }

        //Loop runs in its own goroutine. Select lets loop avoid blocking indefinitely in any one state. The cases interact via local state in loop.
        select {
        case errc := <-s.closing:
            //loop handles Close by replying with the Fetch error and exiting.
            //The service(loop) listens for requests on its channel(s.closing).
            //The client(Close) sends a request on s.closing: exit and reply with the error.
            //In this case, the only thing in the request is the reply channel.
            errc <- err
            close(s.updates) // tell receiver we're done
            return
        case <-startFetch:
            //Schedule the next Fetch after some delay.

            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            var fetched = result.fetched
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, fetched...)
                    seen[item.GUID] = true
                }
            }
        case updates <- first:
            pending = pending[1:]
        }
    }
}

Implemented Subscribe

Responsive. Cleans up. Easy to read and change.
Three techniques:

  • for -select loop
  • service channel, reply channels(chan chan error)
  • nil channel in select cases

Conclusion

Concurrent programming can be tricky. Go makes it easier:

  • Channels convey data, timer events, cancellation signals
  • goroutines serialize access to local mutable state
  • statck traces & deadlock detector
  • race detector

Share memory by communicating