type Sale struct { dollars float64, timestamp time.Time }
func producer(salesCh chan<- Sale) {
for {
salesCh <- makeSale() // every 500ms
}
}
func consumer(salesCh <-chan Sale) {
for s := range salesCh {
redrawUI(s) // takes 700ms
}
}
func main() {
salesCh := make(chan Sale)
go producer(salesCh)
go consumer(salesCh)
select {} // block forever
}
Adding a queue never improves performance! We’re just delaying the problem.
What’s up with our total sales number?! Non-blocking sends drop messages!
We’ve only shifted the problem into the intermediate goroutine.
func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) {
outCh := make(chan T)
inCh := make(chan T)
go func() {
var lastMsg T
var retryTimer *time.Timer
var retryCh <-chan time.Time
for {
select {
case lastMsg = <-inCh:
if retryTimer != nil {
retryTimer.Stop()
retryCh = nil
}
case <-retryCh:
}
select {
case outCh <- lastMsg:
default:
retryTimer = time.NewTimer(retryInterval)
retryCh = retryTimer.C
}
}
}()
return inCh, outCh
}
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
01.9s | Sale received after 201ms: $2.00 at 01.0s. Total sales: $3.00
02.0s | Made sale: $4.00 at 02.0s
02.5s | Made sale: $5.00 at 02.5s
02.6s | Sale received after 402ms: $3.00 at 01.5s. Total sales: $6.00
03.0s | Made sale: $6.00 at 03.0s
03.3s | Sale received after 102ms: $5.00 at 02.5s. Total sales: $11.00
Latest is sometimes good enough, but we care about total sales!
func ConflateV3[T Conflater[T]](retryInterval time.Duration) (chan<- T, <-chan T) {
outCh := make(chan T)
inCh := make(chan T)
go func() {
var conflatedMessage T
var retryTimer *time.Timer
var retryCh <-chan time.Time
for {
select {
case lastMsg := <-inCh:
conflatedMessage = conflatedMessage.ConflateWith(lastMsg)
if retryTimer != nil {
retryTimer.Stop()
retryCh = nil
}
case <-retryCh:
}
select {
case outCh <- conflatedMessage:
conflatedMessage = T.ZeroValue(conflatedMessage)
default:
retryTimer = time.NewTimer(retryInterval)
retryCh = retryTimer.C
}
}
}()
return inCh, outCh
}
00.5s | Made sale: $1.00 at 00.5s
01.0s | Made sale: $2.00 at 01.0s
01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00
01.5s | Made sale: $3.00 at 01.5s
01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00
02.0s | Made sale: $4.00 at 02.0s
02.5s | Made sale: $5.00 at 02.5s
02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00
03.0s | Made sale: $6.00 at 03.0s
03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.00
Concurrency Nirvana? Or…