Thursday, February 27, 2020

Priority Channel in Go

I'm kind of impressed with this ugly monster:

package main

import (
 "context"
 "fmt"
 "sync"
 "time"
)

func main() {
 const levels = 3
 const sourceDepth = 5
 sources := make([]chan int, levels)
 for i := 0; i < levels; i++ {
  sources[i] = make(chan int, sourceDepth)
 }
 out := make(chan int)

 ctx, cancel := context.WithCancel(context.Background())

 wg := &sync.WaitGroup{}
 pc := New(ctx, sources, 10, out)
 wg.Add(1)
 go func() {
  defer wg.Done()
  defer close(out)
  pc.Prioritize()
 }()

 wg.Add(1)
 go func() {
  defer wg.Done()
  for i := range out {
   fmt.Println("i: ", i)
   time.Sleep(time.Second / 4)
  }
 }()

 for _, i := range []int{0, 2, 1, 0, 2, 1, 0, 2, 1} {
  fmt.Println("submitting ", i)
  pc.Submit(i, i)
 }
 time.Sleep(time.Second * 3)
 cancel()
 wg.Wait()
}

type PriorityChannel struct {
 notify  chan struct{}
 sources []chan int
 out     chan int
 ctx     context.Context
}

func New(ctx context.Context, sources []chan int, cap int, out chan int) PriorityChannel {
 pc := PriorityChannel{
  notify:  make(chan struct{}, cap),
  sources: sources,
  out:     out,
  ctx:     ctx,
 }
 for i := 0; i < cap; i++ {
  pc.notify <- struct{}{}
 }
 return pc
}

func (pc PriorityChannel) Prioritize() {
 for {
  // block until there's a value
  select {
  case pc.notify <- struct{}{}:
   // proceed
  case <-pc.ctx.Done():
   return
  }
 SOURCES:
  for _, rcv := range pc.sources {
   select {
   case i := <-rcv:
    pc.out <- i
    break SOURCES
   default:
    // keep looping
   }
  }
 }
}

func (pc PriorityChannel) Submit(i, priority int) {
 if priority < 0 || priority >= len(pc.sources) {
  panic("invalid priority")
 }
 pc.sources[priority] <- i
 <-pc.notify
}