concurrency in go - a guided tour devfest dc fall 2016

Post on 15-Jan-2017

62 Views

Category:

Software

2 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Concurrency in GoA Guided Tour23 September 2016

Jon BodnerTechnical Fellow, Capital One

Go is Di�erent

Other languages use libraries for concurrency (C, C++, Python, Ruby)

Some languages have global locks that block parallel execution (Python, Ruby)

Java has a hybrid approach, some built-in, but mostly library

Go has concurrency based on Communicating Sequential Processes (CSP) baked into thelanguge

Goroutines

Goroutines are lightweight processes managed by the Go runtime.

Goroutines are not Threads

Threads are managed by the operating system while goroutines are managed by the Goruntime scheduler.

Goroutine creation is faster than thread creation.

Goroutine stack sizes are smaller than thread stack sizes and can grow as needed.

Switching between goroutines is faster than switching between threads.

Go programs can spawn hundreds of thousands of goroutines.

Read more at www.slideshare.net/matthewrdale/demystifying-the-go-scheduler(http://www.slideshare.net/matthewrdale/demystifying-the-go-scheduler)

Using Goroutines

package main import "fmt" func printIt() { fmt.Println("Hello from printIt") } func main() { printIt() } Run

package main import "fmt" func printIt() { fmt.Println("Hello from printIt") } func main() { go printIt() } Run

How To Pause (the wrong way)

package main import "fmt" func printIt() { fmt.Println("Hello from printIt") } func main() { go printIt() for { } } Run

How To Pause (the wrong way)

package main import ( "fmt" "time" ) func printIt() { fmt.Println("Hello from printIt") } func main() { go printIt() time.Sleep(1 * time.Second) } Run

How to Pause (a better way)

Use a WaitGroup to pause one goroutine until other goroutines complete

package main import ( "fmt" "sync" ) var wg sync.WaitGroup func printIt() { fmt.Println("Hello from printIt") wg.Done() } func main() { wg.Add(1) go printIt() wg.Wait() } Run

Talking to My Goroutines

You can break communication down into two parts:

Initialization

Data going in and out of a goroutine

Initialization

Initialization - Local and global variables

Pass parameters into the goroutine function

Goroutines can refer to global variables

var wg sync.WaitGroup var mood = "happy" func printIt(name string) { fmt.Printf("Hello from printIt to %s. I feel %s.", name, mood) wg.Done() } func main() { wg.Add(1) go printIt("Bob") wg.Wait() } Run

Initialization - Closure variable capture

Inline goroutine functions are closures

var mood = "happy" func main() { //I'm a local variable now! var wg sync.WaitGroup name := "Bob" wg.Add(1) go func() { fmt.Printf("Hello from closure to %s. I feel %s.", name, mood) wg.Done() }() wg.Wait() } Run

Initialization - Separate concurrency from logic

var mood = "happy" func printIt(name string) { fmt.Printf("Hello from printIt to %s. I feel %s.", name, mood) } func main() { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() printIt("Bob") }() wg.Wait() } Run

Initialization - Be careful with loop variables

package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println(i) wg.Done() }() } wg.Wait() } Run

Initialization - Pass in a loop variable

package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(myI int) { fmt.Println(myI) wg.Done() }(i) } wg.Wait() } Run

Initialization - Refer to a copy of a loop variable

package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) myI := i go func() { fmt.Println(myI) wg.Done() }() } wg.Wait() } Run

Channels

Channels - The heart of CSP

Channels are synchronized FIFO queues.

One goroutine writes data into a channel and another goroutine reads the data out inthe order in which it was put on the channel.

Multiple goroutines can read and write to a single channel.

By default, reads and writes to a channel are synchronous.

Channels - The heart of CSP

Rules for values placed on channels:

Values put in a channel are treated just like values in a method call.

A reader of the channel gets a copy of the value that the writer put in the channel.

A copy of a pointer refers back to the same value as the original pointer.

Maps, slices, and channels are automatically pointers.

Channels

Create a new channel using make

in := make(chan string)

Write to a channel using <-

in <- "Bob"

Read from a channel using <-

name := <-in

When you are done writing to a channel, you should close the channel using the built-infunction close() to inform readers that no more data is coming.

close(in)

Channels

Demo:

package main import ( "fmt" ) func main() { in := make(chan string) out := make(chan string) go func() { name := <-in out <- fmt.Sprintf("Hello, " + name) }() in <- "Bob" close(in) message := <- out fmt.Println(message) } Run

Channel Directions

When declaring a channel variable in a struct, function parameter, or function return value,you can specify the direction for the channel. This doesn't a�ect the underlying channel.

To make a read-only channel variable, use

func printName(readCh <-chan string) {

And to make a write-only channel variable, use

func sendName(writeCh chan<- string) {

The one thing to look out for is that you cannot close a read-only channel.

Channel Directions - Demo

func printName(readCh <-chan string) { name := <-readCh fmt.Println("Hello, " + name) } func sendName(writeCh chan<- string) { writeCh <- "Bob" close(writeCh) } func main() { var wg sync.WaitGroup ch := make(chan string) wg.Add(2) go func() { printName(ch) wg.Done() }() go func() { sendName(ch) wg.Done() }() wg.Wait() } Run

Bu�ered Channels

Bu�ered channels allow writes to continue until the bu�er is �lled.

Any subsequent writes will pause until there is room in the bu�er.

Reads are identical for bu�ered and unbu�ered channels.

Create a bu�ered channel by specifying the bu�er size in make()

ch := make(chan string, 2)

Once the bu�er size of a channel is set, it can't be changed.

It's not possible to create a channel with a limitless bu�er.

Ranging Over a Channel

Like maps, slices, and arrays, Go lets you use a channel in a for-range loop.

There is only a single value returned by a channel in a for-range loop.

The loop continues until the channel is closed.

package main import "fmt" func main() { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- i } close(ch) }() for v := range ch { fmt.Println(v) } } Run

Using Channels as Generators or Iterators

Ranging over channels looks like a Python Generator or a Java Iterator, but it's not.

Can leak a goroutine if you exit the for-range loop before the writing goroutine exits.

func main() { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- i } close(ch) }() for v := range ch { fmt.Println(v) if v > 5 { // We're about to leak a goroutine break } } } Run

Select

Select

The select keyword allows you to read or write to the �rst available channel.

If more than one channel is available, the winner is selected at random.

func writeDelay(ch chan string, message string, duration time.Duration) { time.Sleep(duration) ch <- message } func main() { ch1 := make(chan string) ch2 := make(chan string) go writeDelay(ch1, "Bob", 2*time.Second) go writeDelay(ch2, "Fred", 1*time.Second) select { case v := <-ch1: fmt.Println("Channel 1 returned", v) case v := <-ch2: fmt.Println("Channel 2 returned", v) } } Run

Non-blocking Channel Reads and Writes

Adding a default clause to select allows you to have a non-blocking read or write.

func writeDelay(ch chan string, message string, duration time.Duration) { time.Sleep(duration) ch <- message } func main() { ch1 := make(chan string) ch2 := make(chan string) go writeDelay(ch1, "Bob", 2*time.Second) go writeDelay(ch2, "Fred", 3*time.Second) time.Sleep(1 * time.Second) select { case v := <-ch1: fmt.Println("Channel 1 returned", v) case v := <-ch2: fmt.Println("Channel 2 returned", v) default: fmt.Println("No one is ready, I'm leaving") } fmt.Println("Left") } Run

The For-Select Pattern

Put a select statement in a for loop to keep on selecting from multiple channels.

Make sure there is some way to exit the for loop.

Be careful using a default clause with a for-select loop.

func main() { ch1 := make(chan string) ch2 := make(chan string) go writeDelay(ch1, "Bob", 2*time.Second) go writeDelay(ch2, "Fred", 1*time.Second) for count := 0; count < 2; count++ { select { case v := <-ch1: fmt.Println("Channel 1 returned", v) case v := <-ch2: fmt.Println("Channel 2 returned", v) } } } Run

Timeouts

Use the time.After() function to get a channel that returns a value after a speci�edduration.

func writeDelay(ch chan string, message string, duration time.Duration) { time.Sleep(duration) ch <- message } func main() { ch1 := make(chan string) ch2 := make(chan string) go writeDelay(ch1, "Bob", 2*time.Second) go writeDelay(ch2, "Fred", 1*time.Second) select { case v := <-ch1: fmt.Println("Channel 1 returned", v) case v := <-ch2: fmt.Println("Channel 2 returned", v) case <-time.After(1 * time.Second): fmt.Println("I'm tired of waiting.") } } Run

Gotchas

Writing to a nil Channel

What will this do?

A) Panic B) Return immediately C) Hang Forever

func main() { // This goroutine is running in the background go func() { for { } }() var ch chan string ch <- "Bob" fmt.Println("Done") } Run

Writing to a nil Channel

A) Panic B) Return immediately C) Hang Forever

Reading from a nil Channel

What will this do?

A) Panic B) Return immediately C) Hang Forever

func main() { // This goroutine is running in the background go func() { for { } }() var ch chan string v := <-ch fmt.Println("Done:", v) } Run

Reading from a nil Channel

A) Panic B) Return immediately C) Hang Forever

Nil Channels

This blog post by Dave Cheney dave.cheney.net/2013/04/30/curious-channels(http://dave.cheney.net/2013/04/30/curious-channels) shows you how to "turn o�" cases in a select clause by setting a

channel variable to nil.

func main() { a, b := make(chan struct{}), make(chan struct{}) go DoStuff(a,1) go DoStuff(b,2) for a != nil || b != nil { select { case <- a: fmt.Println("a is done") a = nil case <- b: fmt.Println("b is done") b = nil } } fmt.Println("main is done") } Run

Reading from a Closed Channel

What will this do?

A) Panic B) Return immediately C) Hang Forever

func main() { ch := make(chan int) close(ch) v := <-ch fmt.Println("Done:", v) } Run

Reading from a Closed Channel

A) Panic B) Return immediately C) Hang Forever

Reading from a closed channel returns the zero value for the channel's type.

Use ,̀ ok` idiom to tell if a channel is closed.

Closed channels returning immediately is extremely useful!

func main() { ch := make(chan int) go func() { ch <- 10 close(ch) }() x, ok := <-ch fmt.Println(x, ok) x2, ok2 := <-ch fmt.Println(x2, ok2) } Run

The Done Pattern

The Done Pattern uses a closed channel of type struct{} to indicate to a goroutine that itshould shut down.

Multiple goroutines can read from the channel at the same time, allowing them all toshut down.

The Done Pattern

func doStuff(id int, ch chan int, done chan struct{}) { for { select { case v := <-ch: fmt.Println(id, v) time.Sleep(1 * time.Second) case <-done: fmt.Println(id, "shutting down") return } } } func main() { ch := make(chan int) done := make(chan struct{}) go doStuff(1, ch, done) go doStuff(2, ch, done) for i := 0; i < 10; i++ { ch <- i } close(done) time.Sleep(2 * time.Second) // don't do this for real! } Run

Leaking Goroutines

It's important to allow goroutines to be cleaned up.

Go cannot detect that a running goroutine is no longer relevant.

Leaked goroutines can waste considerable amounts of memory and cause the schedulerto do extra work assigning unused goroutines to threads.

You need to make sure that any goroutines exit when they are no longer in use.

If you have a goroutine that has a for loop in it, you need to make sure that it has someway to break out of it.

The done channel idiom is a good way to do this.

Causing a panic

Write to a closed channel

Close a nil channel

Close a closed channel

func main() { ch := make(chan string) close(ch) ch <- "Bob" fmt.Println("Done") } Run

func main() { var ch chan string close(ch) } Run

func main() { ch := make(chan string) close(ch) close(ch) } Run

Mutexes

Mutexes

Go has support for mutexes in library

sync.Mutex for serialized access

sync.RWMutex for multiple readers, single writer

Channels orchestrate; mutexes serialize

Mutexes

var counter int var m sync.Mutex func update(i int) { for { m.Lock() counter++ fmt.Println(i, "set counter to", counter) m.Unlock() time.Sleep(1 * time.Millisecond) } } func main() { for i := 0; i < 2; i++ { go update(i) } time.Sleep(100 * time.Millisecond) } Run

Deadlock and Race Conditions

Deadlock

All goroutines are waiting on a channel or a mutex

Go will detect deadlock and panic

func readIt( ch chan int) { x := <- ch fmt.Println( "read",x) } func main() { ch := make(chan int) go readIt(ch) ch2 := make(chan struct{}) <- ch2 } Run

Race Checker

Need to be careful when accessing shared data.

Could be a pointer shared over a channel or a global variable.

Go provides a -race compile option that adds a race checker to your application.

Race checker prints out the lines where there is unprotected access to the same datafrom multiple goroutines.

Race checker makes program slower, so don't use it in production.

Race Checker

func main() { c1 := make(chan *int) go func() { x := <-c1 for i := 0; i < 5; i++ { *x = *x * 2 fmt.Println(*x) } }() s := 2 c1 <- &s for i := 0; i < 5; i++ { s = s + 1 } time.Sleep(500) fmt.Println("done") }

Race Checker

f45c89a52f1f:code29 mbh475$ ./race 4 8 16 ================== WARNING: DATA RACE32 Write at 0x00c42000c2b8 by main goroutine: 64 main.main() /Users/mbh475/go_projects/src/github.kdc.capitalone.com/jonbodner/concurrency/code29/race.go:21 +0xf8 Previous write at 0x00c42000c2b8 by goroutine 6: main.main.func1() /Users/mbh475/go_projects/src/github.kdc.capitalone.com/jonbodner/concurrency/code29/race.go:14 +0xa6 Goroutine 6 (running) created at: main.main() /Users/mbh475/go_projects/src/github.kdc.capitalone.com/jonbodner/concurrency/code29/race.go:17 +0x7f================== done Found 1 data race(s) f45c89a52f1f:code29 mbh475$

Putting it all together

Putting it all together - Flow

Putting it all together - No Concurrency

func process(urls []string) { for len(urls) > 0 { curUrl := urls[0] urls = urls[1:] text := pullDownData(curUrl) content, foundUrls := extractFromText(text) indexValue(content) urls = append(urls, foundUrls...) } } func main() { urls := []string{"url0"} bench(func() { process(urls) }) } Run

Putting it all together - Recursive Goroutine Launching

Putting it all together - Recursive Goroutine Launching

var indexLock sync.Mutex func process(urls []string) { var wg sync.WaitGroup for len(urls) > 0 { curUrl := urls[0] urls = urls[1:] wg.Add(1) go func() { text := pullDownData(curUrl) content, foundUrls := extractFromText(text) indexLock.Lock() indexValue(content) indexLock.Unlock() process(foundUrls) wg.Done() }() } wg.Wait() } Run

Putting it all together - Replace Mutex with Channel

var indexChan = make(chan string) func process(urls []string) { var wg sync.WaitGroup for len(urls) > 0 { curUrl := urls[0] urls = urls[1:] wg.Add(1) go func() { text := pullDownData(curUrl) content, foundUrls := extractFromText(text) indexChan <- content process(foundUrls) wg.Done() }() } wg.Wait() }

Putting it all together - Replace Mutex with Channel

func main() { var wg sync.WaitGroup wg.Add(1) go func() { for { c, ok := <-indexChan if !ok { wg.Done() return } indexValue(c) } }() urls := []string{"url0"} bench(func() { process(urls) close(indexChan) wg.Wait() }) } Run

Putting it all together - Bounded Number of Goroutines

Putting it all together - Bounded Number of Goroutines

type result struct { source string found []string } func process(urls []string) { urlChan := make(chan string) results := make(chan result) for i := 0; i < 4; i++ { go func() { for { url, ok := <-urlChan if !ok { return } text := pullDownData(url) content, foundUrls := extractFromText(text) indexChan <- content results <- result{url, foundUrls} } }() } runPool(urls, urlChan, results) close(urlChan) }

Putting it all together - Bounded Number of Goroutines

func runPool(urls []string, urlChan chan<- string, results <-chan result) { processing := map[string]bool{} for _, v := range urls { processing[v] = true urlChan <- v } for len(processing) > 0 { result := <-results delete(processing, result.source) for _, v := range result.found { processing[v] = true urlChan <- v } } } Run

More Resources

Go Concurrency Patterns: talks.golang.org/2012/concurrency.slide (https://talks.golang.org/2012/concurrency.slide)

Advanced Go Concurrency Patterns: blog.golang.org/advanced-go-concurrency-patterns(https://blog.golang.org/advanced-go-concurrency-patterns)

Visualizing Concurrency in Go: divan.github.io/posts/go_concurrency_visualize/(http://divan.github.io/posts/go_concurrency_visualize/)

Go Concurrency Patterns: Pipelines and cancellation: blog.golang.org/pipelines(http://blog.golang.org/pipelines)

Questions?

Thank you

Jon BodnerTechnical Fellow, Capital Onejonathan.bodner@capitalone.com (mailto:jonathan.bodner@capitalone.com)

@jonbodner (http://twitter.com/jonbodner)

top related