package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
const levels = 3
const sourceDepth = 5
sources := make([]chan int, levels)
for i := 0; i < levels; i++ {
sources[i] = make(chan int, sourceDepth)
}
out := make(chan int)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
pc := New(ctx, sources, 10, out)
wg.Add(1)
go func() {
defer wg.Done()
defer close(out)
pc.Prioritize()
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := range out {
fmt.Println("i: ", i)
time.Sleep(time.Second / 4)
}
}()
for _, i := range []int{0, 2, 1, 0, 2, 1, 0, 2, 1} {
fmt.Println("submitting ", i)
pc.Submit(i, i)
}
time.Sleep(time.Second * 3)
cancel()
wg.Wait()
}
type PriorityChannel struct {
notify chan struct{}
sources []chan int
out chan int
ctx context.Context
}
func New(ctx context.Context, sources []chan int, cap int, out chan int) PriorityChannel {
pc := PriorityChannel{
notify: make(chan struct{}, cap),
sources: sources,
out: out,
ctx: ctx,
}
for i := 0; i < cap; i++ {
pc.notify <- struct{}{}
}
return pc
}
func (pc PriorityChannel) Prioritize() {
for {
// block until there's a value
select {
case pc.notify <- struct{}{}:
// proceed
case <-pc.ctx.Done():
return
}
SOURCES:
for _, rcv := range pc.sources {
select {
case i := <-rcv:
pc.out <- i
break SOURCES
default:
// keep looping
}
}
}
}
func (pc PriorityChannel) Submit(i, priority int) {
if priority < 0 || priority >= len(pc.sources) {
panic("invalid priority")
}
pc.sources[priority] <- i
<-pc.notify
}
Jason Mansfield is a software engineer, security enthusiast, and crazy thinker living in San Diego.
Thursday, February 27, 2020
Priority Channel in Go
I'm kind of impressed with this ugly monster:
Subscribe to:
Posts (Atom)