Channel Your Inner Gopher
Channeling your Inner Gopher - (Literally) Reflecting upon Channels
Many gophers are likely familiar with the communication paradigm, channels. An elegant solution to communicate (uni or bidirectionally) typed information among go-routines. In it’s simplest form, we declare as type-valued channel variable, make it, and then send and receive data through it. Easy enough!
package main
import (
"fmt"
)
func main() {
var simpleChan chan int = make(chan int)
go func(c chan int) {
// send important data to the channel
c <- 42
close(c)
}(simpleChan)
// receive data
num := <-simpleChan
fmt.Printf("got %d\n", num)
}
In our trivial example above, we create an chan int
typed-channel and initialize it using make
.
Next, we define a go-routine that will send data to the channel we pass into our function. In our main program, we block on the channel until we receive the data sent from our go-routine and print out out before exiting.
Channels of Channels
Simple enough. But what happens when we get more complicated and require a more complex concurrent programming paradigm. Say, for example, we want to create multiple channels, or better still channels that communicate channels among go-routines. Wait, what? We can send a channel down a channel?? Woah! Malkovich Malkovich.
Slices of Channels
First, let’s discuss the simple use-case of multiple channels. Envision the use-case where a master process is coordinating messages among a group of go-routines. One approach for implementing this paradigm is to create a channel for each subordinate go-routine and accumulating the channels into an array or a map. Regardless, we need to be able to receive messages for any channel that sends data from our go-routine, and send data to any go-routine that needs a message. For example:
package main
import (
"fmt"
)
const (
MAX_GO = 5
)
var (
magic int = 42
)
func main() {
var list []chan int
var simpleChan chan int
for i := 0; i < MAX_GO; i++ {
simpleChan = make(chan int)
list = append(list, simpleChan)
go func(c chan int) {
// send important data to the channel
c <- magic
}(simpleChan)
}
// receive data
for ch := range list {
num := <-list[ch]
fmt.Printf("got %d\n", num)
}
}
But there’s a problem in the code above. In this trivial example, we are not taking into account the possibility of one (or more) of our routines blocking the remaining routines from receiving data on their respective channels. Enter keywords select
and default
. Simply attempting to read from our channel when no data is available will result in our go-routine blocking. By using select
and default
together, we can avoid blocking and move on to the next execution point in our code when and if there is no data to receive on the current channel.
Also note in this trivial example, we only look to read from each channel one time as we iterate through the slice. A more realistic solution would iterate until some completion condition is reached. Take a look:
package main
import (
"fmt"
"time"
)
const (
MAX_GO = 5
)
var (
magic int = 42
)
func main() {
var list []chan int
var simpleChan chan int
for i := 0; i < MAX_GO; i++ {
simpleChan = make(chan int)
list = append(list, simpleChan)
go func(c chan int) {
// send important data to the channel
c <- magic
}(simpleChan)
}
// receive data
var done bool = false
start := time.Now()
for done != true {
for ch := range list {
select {
case num := <-list[ch]:
fmt.Printf("got %d\n", num)
default:
}
}
t := time.Now()
if t.Sub(start) > time.Second*5 {
done = true
}
}
}
In the example above, we set our done
flag after five seconds elapses. Any messages sent during that time are read and then we exit our select
loop after the timeout.
Okay, a better example, but still somewhat contrived. Let’s move on to a bidirectional example. Imagine a ping-pong game where there is one player on one side of the table, and $n$ players on the other side. We send a ping to the other side, and they respond with a pong. We can keep score too and stop the routine after we exchange $k$ messages.
package main
import (
"fmt"
)
const (
MAX_GO = 2
)
var (
magic int = 42
)
type TablePlayer struct {
score int
hit string
}
func main() {
var list []chan TablePlayer
var player chan TablePlayer
for i := 0; i < MAX_GO; i++ {
player = make(chan TablePlayer, 1)
list = append(list, simpleChan)
go func(c chan TablePlayer, id int) {
var pong, end bool
var hit TablePlayer
fmt.Printf("[%d] start game\n", id)
for !end {
select {
case hit = <-c:
fmt.Printf("\t[%d] recv ping, score=%d!\n", id, hit.score)
if hit.score > 10 {
end = true
}
pong = true
default:
}
if pong {
select {
case c <- TablePlayer{
score: hit.score + 1,
}:
fmt.Printf("\t[%d] sent pong!, score=%d\n", id, hit.score+1)
default:
}
pong = false
}
}
fmt.Printf("[%d] game over\n", id)
}(list[i], i)
}
fmt.Printf("%d start game go-routines\n", len(list))
// initial serve
for ch := range list {
fmt.Printf("[%d] serve!\n", ch)
list[ch] <- TablePlayer{
score: 1,
}
}
var pong, end bool
for !end {
var hit TablePlayer
for ch := range list {
select {
case hit = <-list[ch]:
fmt.Printf("[%d] recv ping, score=%d\n", ch, hit.score)
if hit.score > 10 {
end = true
}
pong = true
default:
}
if pong {
select {
case list[ch] <- TablePlayer{
score: hit.score + 1,
}:
fmt.Printf("[%d] sent pong\n", ch)
default:
}
pong = false
}
}
}
fmt.Printf("game over\n")
}
Buffered verses Un-buffered Channels
CRITICALLY IMPORTANT! Notice how we make the channel:
player = make(chan TablePlayer, 1)
A common source of confusion and error when implementing non-trivial concurrent applications utilizing channels is buffering. Here’s what the documents say:
By default channels are unbuffered, meaning that they will only accept sends (chan <-) if there is a corresponding receive (<- chan) ready to receive the sent value. Buffered channels accept a limited number of values without a corresponding receiver for those values.
In order for our concurrent go-routines to work asynchronously, we need to buffer our channel by adding the buffer size. Simply adding $1$ to the size makes our channel a buffered channel and thereby will allow us to send without a receive waiting on our channel.
Experiment with the ping-ping example above by making the channel unbuffered by remove the integer from the make
call.
Concurrent Design
We are quickly seeing that as our concurrent paradigm becomes non-trivial, we are more likely to run into complexity issues. How we think about and design our concurrent algorithm can be challenging. One major problem is deadlocks.
Okay, now that we’ve enumerated several examples from the down-right trivial to the not-so-trivial. Let’s look at something a little more complex. Take a look at the variable definition - var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN)
. What the hell is this?#?$*!?. Breaking down the definition, we see we have defined a channel of type chan interface{}
. Said another way, chs
is a channel variable that sends and receives channels of type chan interface{}
. Let that sink in for a few seconds.
Imagine the following use case: We want to dynamically create channels, send them to our channel of type chan interface{}
and then send and receive messages on the channels that we sent to our meta-channel.
Reflecting Once Upon a Channel
In this example, we’re going to reach deeper into the golang bag of tools, and employ the reflect
package of goodies that operates on channels. Check it out:
The critical element in the reflect
package is the type SelectCase
.
From the Docs:
A SelectCase describes a single case in a select operation. The kind of case depends on Dir, the communication direction.
If Dir is SelectDefault, the case represents a default case. Chan and Send must be zero Values.
If Dir is SelectSend, the case represents a send operation. Normally Chan’s underlying value must be a channel, and Send’s underlying value must be assignable to the channel’s element type. As a special case, if Chan is a zero Value, then the case is ignored, and the field Send will also be ignored and may be either zero or non-zero.
If Dir is SelectRecv, the case represents a receive operation. Normally Chan’s underlying value must be a channel and Send must be a zero Value. If Chan is a zero Value, then the case is ignored, but Send must still be a zero Value. When a receive operation is selected, the received Value is returned by Select.
Used in conjunction with the reflect.Select()
function we can dynamically implement send, receive, and even default cases for channels.
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
Select executes a select operation described by the list of cases. Like the Go select statement, it blocks until at least one of the cases can proceed, makes a uniform pseudo-random choice, and then executes that case. It returns the index of the chosen case and, if that case was a receive operation, the value received and a boolean indicating whether the value corresponds to a send on the channel (as opposed to a zero value received because the channel is closed).
Mutexes and concurrent processes
One further point that we need to mention. When we are transforming a data-structure in one go-routine, and accessing that data-structure in another go-routine, we run the risk of making it inconsistent. One common solution is using a Mutex around a critial-section. By restricting access to our critical section with a guard, we can ensure that it stays consistent. Another point of caution. A mutex in a concurrent program may result in a deadlock if we’re not careful. We must consider the order in which resources are aquired or we can end up deadlocking our go-routines waiting on a resource that is locked by another go-routine. A handy golang tool is the -race
flag. We can build our code with the race flag which adds runtime checks to see if we’ve inadvertently implemented a deadlock scenario.
Take a look at the example below. In this pathological, albeit non-trivial example, we bring it all toghether. We use the reflect
package along with mutexes to send channels to our channels and then send messages down the channels that we receive. Notice how we structured the the Lock()
and Unlock()
calls in order to prevent a deadlock.
package main
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
)
type Controller struct {
sync.Mutex
send []reflect.SelectCase
recv []reflect.SelectCase
}
const (
MAX_CHAN = 100
)
var (
dataVals = []string{"doggies", "kitties", "lizzards", "elephants"}
)
func main() {
var counter int
var ctrl Controller
var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN)
var val chan interface{} = make(chan interface{})
// receive chans on chans and add it to the ctrl
go func() {
var count int
for {
select {
case c := <-chs:
count++
ctrl.Lock()
fmt.Printf("adding chan %d\n", count)
ctrl.recv = append(ctrl.recv, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c)})
ctrl.send = append(ctrl.send, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(c),
Send: reflect.ValueOf(counter),
})
ctrl.Unlock()
default:
}
ctrl.Lock()
if len(ctrl.recv) > MAX_CHAN/2 {
fmt.Printf("\t\tPruning...\n")
ctrl.recv = ctrl.recv[:MAX_CHAN/4]
ctrl.send = ctrl.send[:MAX_CHAN/4]
}
ctrl.Unlock()
}
}()
// send counter to all ctrl send channels
go func(val chan interface{}) {
for {
var gotData bool
gotData = false
var data interface{}
select {
case data = <-val:
gotData = true
default:
}
counter++
for i := 0; i < len(ctrl.send); i++ {
if gotData {
ctrl.send[i].Send = reflect.ValueOf(data)
} else {
ctrl.send[i].Send = reflect.ValueOf("No Data")
}
chosen, _, _ := reflect.Select(ctrl.send)
fmt.Printf("sending %d\n", chosen)
}
}
}(val)
// send valhes
go func(val chan interface{}) {
for {
val <- dataVals[rand.Intn(len(dataVals))]
}
}(val)
// add channels and receive values on ctrl receive channels
for {
ctrl.Lock()
if len(ctrl.recv) > 0 {
fmt.Printf("selecting from %d chans\n", len(ctrl.recv))
chosen, recv, recvOk := reflect.Select(ctrl.recv)
if recvOk {
fmt.Printf("Chosen=%d, val=%v\n", chosen, recv.Interface())
} else {
fmt.Println("channels closed")
}
}
ctrl.Unlock()
time.Sleep(time.Microsecond * 10)
ctrl.Lock()
if len(ctrl.recv) < MAX_CHAN {
chs <- make(chan interface{})
}
ctrl.Unlock()
}
}
Let’s deconstruct the example above and see what’s going on:
In order to simplify the sending and receiving operations, I created a type called Controller
which has two arrays of reflect.SelectCase
: one for sending and one for receiving channel operations.
In the main go-routine, I define and create two distinct channels:
var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN)
var val chan interface{} = make(chan interface{})
The first channel is our channel that will receive channels of type chan interface{}
. And the second channel is of type chan interface{}
. We’ll use this second channel to send a value that we want to fan out to all of the channels that we received from our first channel of channels variable, chs
.
Next, we create three go-routines
- The first go-routine receives a channel from our channel of channels channel, and appends it to our
reflect.SelectCase
arrays. Additionally we will dynamically delete channels from our list when the length of the SelectCase arrays exceedsMAX_CHANS/2
. This go-routine runs in an infinite loop. - The second go-routine takes our value channel we created above, and reads data off of the channel and then updates the
SelectCase
sending array with that value to multiplex it to all of the channels that are in oursend
SelectCase
array. Once updated, we then use thereflect.Select
function to send our values to our channels. - The third go-routine acts as a data-pump which randomly selects an arbitrary value from our data source,
dataVals
and sends it to theval chan interface{}
channel passed into this go-routine.
In our main routine, we create an infinite loop that looks to see if our Controller
receive array is non-empty, and if so calls reflect.Select
on our receive SelectCase
array to receive values. Additionally within the loop we look to see if the size of our array is less than MAX_CHAN
after we pruned it in our first go-routine. If so, we create a new channel (of type chan interface{}
) and send it to our chs
channel (of type chan chan iterface{}
).
Pay particularly close attention to the Mutexes wrapping the critical sections. This pathological, but non-trivial example demonstrates how quickly concurrent programming can become complex and unwieldy – have fun!