Heaps and Heaps of fun

I had the need for a priority queue today in golang so I implemented container/heap.Interface which has the following methods:

type Interface interface {
	sort.Interface
	Push(x interface{}) // add x as element Len()
	Pop() interface{}   // remove and return element Len() - 1.
}

Pretty simple interface, Push, Pop and a sorting interface, which contains Len, Swap, and Less. Which ended up looking a lot like the priority queue example in the godocs:

// Item - the item that holds the structure needed for the queue
type Item struct {
	value interface{}
	priority int
	index int
}

type PriorityQueue []*Item

// Len - get the length of the heap
func (pq PriorityQueue) Len() int { return len(pq) }

// Less - determine which is more priority than another
func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].priority < pq[j].priority
}

// Swap - implementation of swap for the heap interface
func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

// Push - implementation of push for the heap interface
func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*Item)
	item.index = n
	*pq = append(*pq, item)
}

// Pop - implementation of pop for heap interface
func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	item.index = -1 // for safety
	*pq = old[0 : n-1]
	return item
}

At this point we have a working priority queue. We use heap.Push and heap.Pop to interface with the heap, and things are lovely. Until you try to use this in a web application, that potentially has many go routines trying to push and pop at the same time.

Much like Golang’s map implementation the container.heap implementation is not thread safe, and is not thread safe for a reason, the application knows better if this implementation needs to be safe or not.

With this in mind, we need to protect this data-structure.

The Wrong Way

The wrong way to try to protect this data-structure is to try to protect it within the implementation of the heap.Interface interface. You might be tempted to use channels for serialization of writes to the underlying data-structure. What could go wrong, you can protect the physical underlying data-structure by having a single go routine doing all of the writes to the data-structure, and being messaged from many go routines.

// Push - implementation of push for the heap interface
func (pq *PriorityQueue) Push(x interface{}) {
	pq.PushChannel <- x
}

func (pq \*PriorityQueue) doWrites() chan bool{
	var quit = make(chan bool)
	go func () {
		for {
			select {
			case quit:
				return
			case x <- pq.PushChannel:
				n := len(*pq)
				item := x.(*Item)
				item.index = n - 1
				*pq = append(*pq, item)
			// ...
			}
		}
	}
	return quit
}

Well, not really. You can not protect it in the implementation of the interface mainly because the heap sorting is performed in the container.heap package, and you can not be sure that each operation happens in the right order due to the channel selection randomness.

The Right Way?

In the end I have decided to serialize the access to the heap.Push and heap.Pop function calls in my application by doing what can be seen below:

// heapPopChanMsg - the message structure for a pop chan
type heapPopChanMsg struct {
	h      heap.Interface
	result chan interface{}
}

// heapPushChanMsg - the message structure for a push chan
type heapPushChanMsg struct {
	h heap.Interface
	x interface{}
}

var (
	quitChan chan bool
	// heapPushChan - push channel for pushing to a heap
	heapPushChan = make(chan heapPushChanMsg)
	// heapPopChan - pop channel for popping from a heap
	heapPopChan = make(chan heapPopChanMsg)
)

// HeapPush - safely push item to a heap interface
func HeapPush(h heap.Interface, x interface{}) {
	heapPushChan <- heapPushChanMsg{
		h: h,
		x: x,
	}
}

// HeapPop - safely pop item from a heap interface
func HeapPop(h heap.Interface) interface{} {
	var result = make(chan interface{})
	heapPopChan <- heapPopChanMsg{
		h:      h,
		result: result,
	}
	return <-result
}

//stopWatchHeapOps - stop watching for heap operations
func stopWatchHeapOps() {
	quitChan <- true
}

// watchHeapOps - watch for push/pops to our heap, and serializing the operations
// with channels
func watchHeapOps() chan bool {
	var quit = make(chan bool)
	go func() {
		for {
			select {
			case <-quit:
				// TODO: update to quit gracefully
				// TODO: maybe need to dump state somewhere?
				return
			case popMsg := <-heapPopChan:
				popMsg.result <- heap.Pop(popMsg.h)
			case pushMsg := <-heapPushChan:
				heap.Push(pushMsg.h, pushMsg.x)
			}
		}
	}()
	return quit
}

This code now serializes the Push/Pops to the heap and should be thread safe.