type Sale struct { dollars float64, timestamp time.Time } func producer(salesCh chan<- Sale) { for { salesCh <- makeSale() // every 500ms } } func consumer(salesCh <-chan Sale) { for s := range salesCh { redrawUI(s) // takes 700ms } } func main() { salesCh := make(chan Sale) go producer(salesCh) go consumer(salesCh) select {} // block forever }type Sale struct { dollars float64, timestamp time.Time } func producer(salesCh chan<- Sale) { for { salesCh <- makeSale() // every 500ms } } func consumer(salesCh <-chan Sale) { for s := range salesCh { redrawUI(s) // takes 700ms } } func main() { salesCh := make(chan Sale) go producer(salesCh) go consumer(salesCh) select {} // block forever }type Sale struct { dollars float64, timestamp time.Time } func producer(salesCh chan<- Sale) { for { salesCh <- makeSale() // every 500ms } } func consumer(salesCh <-chan Sale) { for s := range salesCh { redrawUI(s) // takes 700ms } } func main() { salesCh := make(chan Sale) go producer(salesCh) go consumer(salesCh) select {} // block forever }type Sale struct { dollars float64, timestamp time.Time } func producer(salesCh chan<- Sale) { for { salesCh <- makeSale() // every 500ms } } func consumer(salesCh <-chan Sale) { for s := range salesCh { redrawUI(s) // takes 700ms } } func main() { salesCh := make(chan Sale) go producer(salesCh) go consumer(salesCh) select {} // block forever }type Sale struct { dollars float64, timestamp time.Time } func producer(salesCh chan<- Sale) { for { salesCh <- makeSale() // every 500ms } } func consumer(salesCh <-chan Sale) { for s := range salesCh { redrawUI(s) // takes 700ms } } func main() { salesCh := make(chan Sale) go producer(salesCh) go consumer(salesCh) select {} // block forever }
00.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.7s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 200ms: $2.00 at 01.0s. Total sales: $3.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.7s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 200ms: $2.00 at 01.0s. Total sales: $3.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.7s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 200ms: $2.00 at 01.0s. Total sales: $3.00
... 03.5s | Made sale: $7.00 at 03.5s 04.0s | Sale received after 802ms: $5.00 at 02.5s. Total sales: $15.00 ... 15.0s | Made sale: $25.00 at 12.5s 15.2s | Sale received after 4012ms: $21.00 at 10.5s. Total sales: $231.00... 03.5s | Made sale: $7.00 at 03.5s 04.0s | Sale received after 802ms: $5.00 at 02.5s. Total sales: $15.00 ... 15.0s | Made sale: $25.00 at 12.5s 15.2s | Sale received after 4012ms: $21.00 at 10.5s. Total sales: $231.00... 03.5s | Made sale: $7.00 at 03.5s 04.0s | Sale received after 802ms: $5.00 at 02.5s. Total sales: $15.00 ... 15.0s | Made sale: $25.00 at 12.5s 15.2s | Sale received after 4012ms: $21.00 at 10.5s. Total sales: $231.00... 03.5s | Made sale: $7.00 at 03.5s 04.0s | Sale received after 802ms: $5.00 at 02.5s. Total sales: $15.00 ... 15.0s | Made sale: $25.00 at 12.5s 15.2s | Sale received after 4012ms: $21.00 at 10.5s. Total sales: $231.00
Adding a queue never improves performance! We’re just delaying the problem.
00.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 02.0s | Made sale: $4.00 at 02.0s 02.2s | Sale received after 2ms: $3.00 at 01.5s. Total sales: $4.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 02.0s | Made sale: $4.00 at 02.0s 02.2s | Sale received after 2ms: $3.00 at 01.5s. Total sales: $4.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 02.0s | Made sale: $4.00 at 02.0s 02.2s | Sale received after 2ms: $3.00 at 01.5s. Total sales: $4.00
What’s up with our total sales number?! Non-blocking sends drop messages!
func ConflateV1[T any]() (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { for { lastMsg := <-inCh select { case outCh <- lastMsg: default: } } }() return inCh, outCh }func ConflateV1[T any]() (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { for { lastMsg := <-inCh select { case outCh <- lastMsg: default: } } }() return inCh, outCh }func ConflateV1[T any]() (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { for { lastMsg := <-inCh select { case outCh <- lastMsg: default: } } }() return inCh, outCh }func ConflateV1[T any]() (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { for { lastMsg := <-inCh select { case outCh <- lastMsg: default: } } }() return inCh, outCh }
We’ve only shifted the problem into the intermediate goroutine.
func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var lastMsg T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg = <-inCh: if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- lastMsg: default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var lastMsg T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg = <-inCh: if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- lastMsg: default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var lastMsg T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg = <-inCh: if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- lastMsg: default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var lastMsg T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg = <-inCh: if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- lastMsg: default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }func ConflateV2[T any](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var lastMsg T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg = <-inCh: if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- lastMsg: default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }
00.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 201ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 402ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 102ms: $5.00 at 02.5s. Total sales: $11.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 0ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 201ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 402ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 102ms: $5.00 at 02.5s. Total sales: $11.00
Latest is sometimes good enough, but we care about total sales!
func ConflateV3[T Conflater[T]](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var conflatedMessage T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg := <-inCh: conflatedMessage = conflatedMessage.ConflateWith(lastMsg) if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- conflatedMessage: conflatedMessage = T.ZeroValue(conflatedMessage) default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }func ConflateV3[T Conflater[T]](retryInterval time.Duration) (chan<- T, <-chan T) { outCh := make(chan T) inCh := make(chan T) go func() { var conflatedMessage T var retryTimer *time.Timer var retryCh <-chan time.Time for { select { case lastMsg := <-inCh: conflatedMessage = conflatedMessage.ConflateWith(lastMsg) if retryTimer != nil { retryTimer.Stop() retryCh = nil } case <-retryCh: } select { case outCh <- conflatedMessage: conflatedMessage = T.ZeroValue(conflatedMessage) default: retryTimer = time.NewTimer(retryInterval) retryCh = retryTimer.C } } }() return inCh, outCh }
00.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.0000.5s | Made sale: $1.00 at 00.5s 01.0s | Made sale: $2.00 at 01.0s 01.2s | Sale received after 1ms: $1.00 at 00.5s. Total sales: $1.00 01.5s | Made sale: $3.00 at 01.5s 01.9s | Sale received after 202ms: $2.00 at 01.0s. Total sales: $3.00 02.0s | Made sale: $4.00 at 02.0s 02.5s | Made sale: $5.00 at 02.5s 02.6s | Sale received after 403ms: $3.00 at 01.5s. Total sales: $6.00 03.0s | Made sale: $6.00 at 03.0s 03.3s | Sale received after 103ms: $9.00 at 02.5s. Total sales: $15.00
Concurrency Nirvana? Or…