CyberSpy

Rantings from a guy with way too much free time

Channel Your Inner Gopher

2017-12-13 programming Rob Baruch

Channeling your Inner Gopher - (Literally) Reflecting upon Channels

Many gophers are likely familiar with the communication paradigm, channels. An elegant solution to communicate (uni or bidirectionally) typed information among go-routines. In it’s simplest form, we declare as type-valued channel variable, make it, and then send and receive data through it. Easy enough!

package main

import (
        "fmt"
)

func main() {
        var simpleChan chan int = make(chan int)

        go func(c chan int) {
                // send important data to the channel
                c <- 42
                close(c)
        }(simpleChan)

        // receive data
        num := <-simpleChan
        fmt.Printf("got %d\n", num)

}

In our trivial example above, we create an chan int typed-channel and initialize it using make.

Next, we define a go-routine that will send data to the channel we pass into our function. In our main program, we block on the channel until we receive the data sent from our go-routine and print out out before exiting.

Channels of Channels

Simple enough. But what happens when we get more complicated and require a more complex concurrent programming paradigm. Say, for example, we want to create multiple channels, or better still channels that communicate channels among go-routines. Wait, what? We can send a channel down a channel?? Woah! Malkovich Malkovich.

Slices of Channels

First, let’s discuss the simple use-case of multiple channels. Envision the use-case where a master process is coordinating messages among a group of go-routines. One approach for implementing this paradigm is to create a channel for each subordinate go-routine and accumulating the channels into an array or a map. Regardless, we need to be able to receive messages for any channel that sends data from our go-routine, and send data to any go-routine that needs a message. For example:

package main
  
import (
        "fmt"
)

const (
        MAX_GO = 5
)

var (
        magic int = 42
)

func main() {

        var list []chan int

        var simpleChan chan int
        for i := 0; i < MAX_GO; i++ {

                simpleChan = make(chan int)
                list = append(list, simpleChan)
                go func(c chan int) {
                        // send important data to the channel
                        c <- magic
                }(simpleChan)
        }

        // receive data
        for ch := range list {
                num := <-list[ch]
                fmt.Printf("got %d\n", num)
        }

}

But there’s a problem in the code above. In this trivial example, we are not taking into account the possibility of one (or more) of our routines blocking the remaining routines from receiving data on their respective channels. Enter keywords select and default. Simply attempting to read from our channel when no data is available will result in our go-routine blocking. By using select and default together, we can avoid blocking and move on to the next execution point in our code when and if there is no data to receive on the current channel.

Also note in this trivial example, we only look to read from each channel one time as we iterate through the slice. A more realistic solution would iterate until some completion condition is reached. Take a look:

package main

import (
        "fmt"
        "time"
)

const (
        MAX_GO = 5
)

var (
        magic int = 42
)

func main() {

        var list []chan int

        var simpleChan chan int
        for i := 0; i < MAX_GO; i++ {

                simpleChan = make(chan int)
                list = append(list, simpleChan)
                go func(c chan int) {
                        // send important data to the channel
                        c <- magic
                }(simpleChan)
        }

        // receive data
        var done bool = false
        start := time.Now()
        for done != true {
                for ch := range list {
                        select {
                        case num := <-list[ch]:
                                fmt.Printf("got %d\n", num)
                        default:

                        }
                }
                t := time.Now()
                if t.Sub(start) > time.Second*5 {
                        done = true
                }

        }

}

In the example above, we set our done flag after five seconds elapses. Any messages sent during that time are read and then we exit our select loop after the timeout.

Okay, a better example, but still somewhat contrived. Let’s move on to a bidirectional example. Imagine a ping-pong game where there is one player on one side of the table, and $n$ players on the other side. We send a ping to the other side, and they respond with a pong. We can keep score too and stop the routine after we exchange $k$ messages.

package main

import (
        "fmt"
)

const (
        MAX_GO = 2
)

var (
        magic int = 42
)

type TablePlayer struct {
        score int
        hit   string
}

func main() {

        var list []chan TablePlayer

        var player chan TablePlayer

        for i := 0; i < MAX_GO; i++ {

                player = make(chan TablePlayer, 1)
                list = append(list, simpleChan)
                go func(c chan TablePlayer, id int) {
                        var pong, end bool
                        var hit TablePlayer
                        fmt.Printf("[%d] start game\n", id)
                        for !end {
                                select {
                                case hit = <-c:
                                        fmt.Printf("\t[%d] recv ping, score=%d!\n", id, hit.score)
                                        if hit.score > 10 {
                                                end = true
                                        }

                                        pong = true
                                default:

                                }

                                if pong {
                                        select {
                                        case c <- TablePlayer{
                                                score: hit.score + 1,
                                        }:
                                                fmt.Printf("\t[%d] sent pong!, score=%d\n", id, hit.score+1)
                                        default:
                                        }
                                        pong = false
                                }
                        }
                        fmt.Printf("[%d] game over\n", id)
                }(list[i], i)
        }

        fmt.Printf("%d start game go-routines\n", len(list))
        // initial serve
        for ch := range list {
                fmt.Printf("[%d] serve!\n", ch)
                list[ch] <- TablePlayer{
                        score: 1,
                }
        }

        var pong, end bool
        for !end {
                var hit TablePlayer
                for ch := range list {

                        select {
                        case hit = <-list[ch]:
                                fmt.Printf("[%d] recv ping, score=%d\n", ch, hit.score)
                                if hit.score > 10 {
                                        end = true
                                }
                                pong = true
                        default:

                        }

                        if pong {
                                select {
                                case list[ch] <- TablePlayer{
                                        score: hit.score + 1,
                                }:
                                        fmt.Printf("[%d] sent pong\n", ch)
                                default:
                                }
                                pong = false
                        }
                }
        }
        fmt.Printf("game over\n")
}

Buffered verses Un-buffered Channels

CRITICALLY IMPORTANT! Notice how we make the channel:

player = make(chan TablePlayer, 1)

A common source of confusion and error when implementing non-trivial concurrent applications utilizing channels is buffering. Here’s what the documents say:

By default channels are unbuffered, meaning that they will only accept sends (chan <-) if there is a corresponding receive (<- chan) ready to receive the sent value. Buffered channels accept a limited number of values without a corresponding receiver for those values.

In order for our concurrent go-routines to work asynchronously, we need to buffer our channel by adding the buffer size. Simply adding $1$ to the size makes our channel a buffered channel and thereby will allow us to send without a receive waiting on our channel.

Experiment with the ping-ping example above by making the channel unbuffered by remove the integer from the make call.

Concurrent Design

We are quickly seeing that as our concurrent paradigm becomes non-trivial, we are more likely to run into complexity issues. How we think about and design our concurrent algorithm can be challenging. One major problem is deadlocks.

Okay, now that we’ve enumerated several examples from the down-right trivial to the not-so-trivial. Let’s look at something a little more complex. Take a look at the variable definition - var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN). What the hell is this?#?$*!?. Breaking down the definition, we see we have defined a channel of type chan interface{}. Said another way, chs is a channel variable that sends and receives channels of type chan interface{}. Let that sink in for a few seconds.

Imagine the following use case: We want to dynamically create channels, send them to our channel of type chan interface{} and then send and receive messages on the channels that we sent to our meta-channel.

Reflecting Once Upon a Channel

In this example, we’re going to reach deeper into the golang bag of tools, and employ the reflect package of goodies that operates on channels. Check it out:

The critical element in the reflect package is the type SelectCase.

From the Docs:

A SelectCase describes a single case in a select operation. The kind of case depends on Dir, the communication direction.

If Dir is SelectDefault, the case represents a default case. Chan and Send must be zero Values.

If Dir is SelectSend, the case represents a send operation. Normally Chan’s underlying value must be a channel, and Send’s underlying value must be assignable to the channel’s element type. As a special case, if Chan is a zero Value, then the case is ignored, and the field Send will also be ignored and may be either zero or non-zero.

If Dir is SelectRecv, the case represents a receive operation. Normally Chan’s underlying value must be a channel and Send must be a zero Value. If Chan is a zero Value, then the case is ignored, but Send must still be a zero Value. When a receive operation is selected, the received Value is returned by Select.

Used in conjunction with the reflect.Select() function we can dynamically implement send, receive, and even default cases for channels.

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select executes a select operation described by the list of cases. Like the Go select statement, it blocks until at least one of the cases can proceed, makes a uniform pseudo-random choice, and then executes that case. It returns the index of the chosen case and, if that case was a receive operation, the value received and a boolean indicating whether the value corresponds to a send on the channel (as opposed to a zero value received because the channel is closed).

Mutexes and concurrent processes

One further point that we need to mention. When we are transforming a data-structure in one go-routine, and accessing that data-structure in another go-routine, we run the risk of making it inconsistent. One common solution is using a Mutex around a critial-section. By restricting access to our critical section with a guard, we can ensure that it stays consistent. Another point of caution. A mutex in a concurrent program may result in a deadlock if we’re not careful. We must consider the order in which resources are aquired or we can end up deadlocking our go-routines waiting on a resource that is locked by another go-routine. A handy golang tool is the -race flag. We can build our code with the race flag which adds runtime checks to see if we’ve inadvertently implemented a deadlock scenario.

Take a look at the example below. In this pathological, albeit non-trivial example, we bring it all toghether. We use the reflect package along with mutexes to send channels to our channels and then send messages down the channels that we receive. Notice how we structured the the Lock() and Unlock() calls in order to prevent a deadlock.

package main

import (
        "fmt"
        "math/rand"
        "reflect"
        "sync"
        "time"
)

type Controller struct {
        sync.Mutex
        send []reflect.SelectCase
        recv []reflect.SelectCase
}

const (
        MAX_CHAN = 100
)

var (
        dataVals = []string{"doggies", "kitties", "lizzards", "elephants"}
)

func main() {
        var counter int
        var ctrl Controller

        var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN)
        var val chan interface{} = make(chan interface{})

        // receive chans on chans and add it to the ctrl
        go func() {

                var count int
                for {
                        select {
                        case c := <-chs:
                                count++
                                ctrl.Lock()
                                fmt.Printf("adding chan %d\n", count)
                                ctrl.recv = append(ctrl.recv, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c)})
                                ctrl.send = append(ctrl.send, reflect.SelectCase{
                                        Dir:  reflect.SelectSend,
                                        Chan: reflect.ValueOf(c),
                                        Send: reflect.ValueOf(counter),
                                })
                                ctrl.Unlock()
                        default:

                        }

                        ctrl.Lock()
                        if len(ctrl.recv) > MAX_CHAN/2 {
                                fmt.Printf("\t\tPruning...\n")
                                ctrl.recv = ctrl.recv[:MAX_CHAN/4]
                                ctrl.send = ctrl.send[:MAX_CHAN/4]
                        }
                        ctrl.Unlock()
                }

        }()

        // send counter to all ctrl send channels
        go func(val chan interface{}) {

                for {

                        var gotData bool

                        gotData = false
                        var data interface{}
                        select {
                        case data = <-val:
                                gotData = true
                        default:
                        }

                        counter++
                        for i := 0; i < len(ctrl.send); i++ {
                                if gotData {
                                        ctrl.send[i].Send = reflect.ValueOf(data)
                                } else {
                                        ctrl.send[i].Send = reflect.ValueOf("No Data")
                                }
                                chosen, _, _ := reflect.Select(ctrl.send)
                                fmt.Printf("sending %d\n", chosen)
                        }

                }
        }(val)

        // send valhes
        go func(val chan interface{}) {
                for {
                        val <- dataVals[rand.Intn(len(dataVals))]
                }
        }(val)

        // add channels  and receive values on ctrl receive channels
        for {

                ctrl.Lock()
                if len(ctrl.recv) > 0 {
                        fmt.Printf("selecting from %d chans\n", len(ctrl.recv))
                        chosen, recv, recvOk := reflect.Select(ctrl.recv)
                        if recvOk {
                                fmt.Printf("Chosen=%d, val=%v\n", chosen, recv.Interface())

                        } else {
                                fmt.Println("channels closed")
                        }
                }
                ctrl.Unlock()

                time.Sleep(time.Microsecond * 10)

                ctrl.Lock()
                if len(ctrl.recv) < MAX_CHAN {
                        chs <- make(chan interface{})
                }
                ctrl.Unlock()
        }
}

Let’s deconstruct the example above and see what’s going on:

In order to simplify the sending and receiving operations, I created a type called Controller which has two arrays of reflect.SelectCase: one for sending and one for receiving channel operations.

In the main go-routine, I define and create two distinct channels:

  • var chs chan chan interface{} = make(chan chan interface{}, MAX_CHAN)
  • var val chan interface{} = make(chan interface{})

The first channel is our channel that will receive channels of type chan interface{}. And the second channel is of type chan interface{}. We’ll use this second channel to send a value that we want to fan out to all of the channels that we received from our first channel of channels variable, chs.

Next, we create three go-routines

  • The first go-routine receives a channel from our channel of channels channel, and appends it to our reflect.SelectCase arrays. Additionally we will dynamically delete channels from our list when the length of the SelectCase arrays exceeds MAX_CHANS/2. This go-routine runs in an infinite loop.
  • The second go-routine takes our value channel we created above, and reads data off of the channel and then updates the SelectCase sending array with that value to multiplex it to all of the channels that are in our send SelectCase array. Once updated, we then use the reflect.Select function to send our values to our channels.
  • The third go-routine acts as a data-pump which randomly selects an arbitrary value from our data source, dataVals and sends it to the val chan interface{} channel passed into this go-routine.

In our main routine, we create an infinite loop that looks to see if our Controller receive array is non-empty, and if so calls reflect.Select on our receive SelectCase array to receive values. Additionally within the loop we look to see if the size of our array is less than MAX_CHAN after we pruned it in our first go-routine. If so, we create a new channel (of type chan interface{}) and send it to our chs channel (of type chan chan iterface{}).

Pay particularly close attention to the Mutexes wrapping the critical sections. This pathological, but non-trivial example demonstrates how quickly concurrent programming can become complex and unwieldy – have fun!

comments powered by Disqus