Breaking the rules of channels, with more channels

Paul Whalen

The First Rule Of Channels

  • Sending a message on a channel blocks the sending goroutine until the receiving goroutine is ready to receive it
  • This means a producer goroutine will block if the consumer goroutine is slower

The Problem

  • A producer goroutine sends every sale as it happens
  • A consumer goroutine receives sale messages, and redraws the UI with the total sales amount

In Code

type Sale struct { dollars float64, timestamp time.Time }

func producer(salesCh chan<- Sale) {
    for {
        salesCh <- makeSale() // every 500ms
    }
}

func consumer(salesCh <-chan Sale) {
    for s := range salesCh {
        redrawUI(s) // takes 700ms
    }
}

func main() {
    salesCh := make(chan Sale)
    go producer(salesCh)
    go consumer(salesCh)
    select {} // block forever
}

Output - what’s wrong?

  • “Made sale” is the producer
  • “Sale received” is the consumer
  • Price of every sale increases by $1
  • Producer is slow!
  • Consumer is slow!
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00
01.7s | Made sale: $3.00 at 01.5s
01.9s | Sale received after 200ms: $2.00 at 01.0s. Total sales: $3.00

Solution 0: Buffered Channel

func main() {
    salesCh := make(chan Sale, 3)
    go producer(salesCh)
    go consumer(salesCh)
    select {}
}

Grading Solution 0

  • ✅ - The producer doesn’t always block if the consumer isn’t ready
  • ❌ - The consumer has to process every message and will fall behind
  • ❌ - The producer will eventually block when the buffer fills up
...
03.5s | Made sale: $7.00 at 03.5s
04.0s | Sale received after 802ms: $5.00 at 02.5s. Total sales: $15.00
...
15.0s | Made sale: $25.00 at 12.5s
15.2s | Sale received after 4012ms: $21.00 at 10.5s. Total sales: $231.00

Adding a queue never improves performance! We’re just delaying the problem.

Solution 0b: Non-blocking Send

func producer(salesCh chan<- Sale) {
    for {
        select {
        case salesCh <- makeSale():
        default:
        }
    }
}

Grading Solution 0b

  • ✅ - The producer never blocks
  • ❌ - The consumer doesn’t see every message!
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
02.0s | Made sale: $4.00 at 02.0s
02.2s | Sale received after 2ms: $3.00 at 01.5s. Total sales: $4.00

What’s up with our total sales number?! Non-blocking sends drop messages!

Solution 1: Goroutine in the middle

func ConflateV1[T any]() (chan<- T, <-chan T) {
    outCh := make(chan T)
    inCh := make(chan T)
    go func() {
        for {
            lastMsg := <-inCh

            select {
            case outCh <- lastMsg:
            default:
            }
        }
    }()
    return inCh, outCh
}
inCh, outCh := solutions.ConflateV1[Sale]()
go producer(inCh)
go consumer(outCh)

Grading Solution 1

  • ✅ - The producer isn’t blocked sending messages (still)
  • ✅ - We’ve decoupled the producer from the consumer (sort of)
  • ❌ - The consumer doesn’t see every message! (still)
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
02.0s | Made sale: $4.00 at 02.0s
02.2s | Sale received after 3ms: $3.00 at 01.5s. Total sales: $4.00

We’ve only shifted the problem into the intermediate goroutine.

Solution 2: Retry until success

func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) {
    outCh := make(chan T)
    inCh := make(chan T)
    go func() {
        var lastMsg T
        var retryTimer *time.Timer
        var retryCh <-chan time.Time
        for {
            select {
            case lastMsg = <-inCh:
                if retryTimer != nil {
                    retryTimer.Stop()
                    retryCh = nil
                }
            case <-retryCh:
            }

            select {
            case outCh <- lastMsg:
            default:
                retryTimer = time.NewTimer(retryInterval)
                retryCh = retryTimer.C
            }
        }
    }()
    return inCh, outCh
}

Grading Solution 2

  • ✅ - We’re always eventually sending the latest message
  • ✅ - The consumer sees the latest message sooner
  • ❌ - The consumer doesn’t see every message (still)
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
01.9s | Sale received after 201ms: $2.00 at 01.0s. Total sales: $3.00
02.0s | Made sale: $4.00 at 02.0s
02.5s | Made sale: $5.00 at 02.5s
02.6s | Sale received after 402ms: $3.00 at 01.5s. Total sales: $6.00
03.0s | Made sale: $6.00 at 03.0s
03.3s | Sale received after 102ms: $5.00 at 02.5s. Total sales: $11.00

Latest is sometimes good enough, but we care about total sales!

Solution 3: Conflater Interface

  • What if we could conflate messages, so two messages sent by the producer become one message sent to the consumer, while still carrying all the information the consumer cares about?
type Conflater[C any] interface {
    ConflateWith(latest C) C
    ZeroValue() C
}
func (s Sale) ConflateWith(s2 Sale) Sale {
    return Sale{
        dollars:   s.dollars + s2.dollars,
        timestamp: maxTime(s.timestamp, s2.timestamp),
    }
}

func (s Sale) ZeroValue() Sale {
    return Sale{}
}

Solution 3: Conflater

func ConflateV3[T Conflater[T]](retryInterval time.Duration) (chan<- T, <-chan T) {
    outCh := make(chan T)
    inCh := make(chan T)
    go func() {
        var conflatedMessage T
        var retryTimer *time.Timer
        var retryCh <-chan time.Time
        for {
            select {
            case lastMsg := <-inCh:
                conflatedMessage = conflatedMessage.ConflateWith(lastMsg)
                if retryTimer != nil {
                    retryTimer.Stop()
                    retryCh = nil
                }
            case <-retryCh:
            }

            select {
            case outCh <- conflatedMessage:
                conflatedMessage = T.ZeroValue(conflatedMessage)
            default:
                retryTimer = time.NewTimer(retryInterval)
                retryCh = retryTimer.C
            }
        }
    }()
    return inCh, outCh
}

Grading Solution 3

  • ✅ - Every message received by the consumer
    • Has the latest data
    • Reflects all sent data
  • ❌ …
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00
02.0s | Made sale: $4.00 at 02.0s
02.5s | Made sale: $5.00 at 02.5s
02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00
03.0s | Made sale: $6.00 at 03.0s
03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.00

Concurrency Nirvana? Or…

Solutions 4, 5, …

  • What about shutdown?
    • Propagating the closed channel?
    • Stopping the running goroutine?
  • These are solvable problems… left as an exercise for the reader

Reflecting on the Conflater pattern

  • It’s nice decoupling in theory, especially with generics
  • It’s a leaky abstraction
    • “Retry interval? What’s that?”
    • “Why do I need to cancel my channel?”
  • I wouldn’t recommend it highly in production code
  • I found a bug in my production code while writing this talk 🙈
  • But… goroutines and channels are fun!
  • If you’re interested in more of these sorts of shenningans, I recommend Concurrency in Go by Katherine Cox-Buday

Thanks For Listening!

  • github.com/pgwhalen/conflate-talk/
  • pgwhalen.com/conflate-talk