Background

I had a problem with a service I was working on today. The problem was very confusing. Symptoms shown were very strange. I had implemented a very simple worker pool as I have on many projects similar to this:

// ErrTimeout - timeout error
var ErrTimeout = errors.New("timeout")

// workersDone - package signal that workers are done
var workersDone = make(chan bool)

// WorkRequest - inputs for an operation
type WorkRequest struct {
	Ret       chan Result
	Input     []byte
}

// Result - result struct for work results
type Result struct {
	Value []byte
	Err   error
}

// WaitForResult - this is a wrapper function to serve as a helper to listen on the
// result channel of the work item passed in, will timeout in 2 seconds
func (w *WorkRequest) WaitForResult() (Result, error) {
	select {
	case <-time.After(2 * time.Second):
		return Result{}, ErrTimeout
	case result := <-w.Ret:
		return result, nil
	}
}

// worker - worker that is in the pool
func worker(input chan WorkRequest) chan bool {
	quit := make(chan bool)
	go func(input chan WorkRequest, quit chan bool) {
		for {
			select {
			case <-quit:
				workersDone<-true
				return
			case v := <-input:
				// do lots of work!!
				result := &Result{} // this result is the result of all the work..
				v.Ret <- result
			}
		}
	}(input, quit)
	return quit
}

Pretty simple. I had this setup to run with a number of workers in a worker pool. I put in a timeout in the event a worker takes a long time I would be able to respond back to the caller in a reasonable amount of time (2 seconds).

Then madness happened. Due to the way we are doing work, it is very CPU intensive and when the system gets loaded down the work queue starts backing up. This makes the WorkRequest lifetime extend past it’s 2 second timeout window. The second I start seeing failures due to timeouts, the system “just locks up” and the service ends up hanging.

This is really strange. I started by looking at the work the worker was performing trying to figure out why it would just stop taking requests… I started looking at the possibility of me signaling the workers in the pool to quit…

It turns out the real problem was in how the communication was happening, or not happening as it turns out, in the WaitForResult method call.

Zooming in on that particular code:

// WaitForResult - this is a wrapper function to serve as a helper to listen on the
// result channel of the work item passed in, will timeout in 2 seconds
func (w *WorkRequest) WaitForResult() (Result, error) {
	select {
	case <-time.After(2 * time.Second):
		return Result{}, ErrTimeout
	case result := <-w.Ret:
		return result, nil
	}
}

It shows pretty clearly here that after 2 seconds, I am returning an empty result, and an error. And therein lies the problem. Since we are returning before we get any messages on w.Ret, there is no active reader reading w.Ret. This means that when the worker tries to write to v.Ret, since no go-routines are listening for that channel, the worker becomes blocked.

A much better solution is to bring the timeout check closer to the work in which we are testing the timeout on, and then sending the error on the channel if the timeout expires:

// WaitForResult - this is a wrapper function to serve as a helper to listen on the
// result channel of the work item passed in, will timeout in 2 seconds
func (w *WorkRequest) WaitForResult() (Result, error) {
	result := <-w.Ret
	return result, result.Err
}

// worker - worker that is in the pool
func worker(input chan WorkRequest) chan bool {
	quit := make(chan bool)
	go func(input chan WorkRequest, quit chan bool) {
		for {
			select {
			case <-quit:
				workersDone<-true
				return
			case v := <-input:
				done := make(chan bool)
				var result Result
				go func(){
					// do lots of work!!
					result := &Result{} // this result is the result of all the work..
					done<-true // this result is the result of all the work
				}()
				select {
				case result := <-done:
					v.Ret <-result
				case <-time.After(2*time.Second):
					v.Ret <-&Result{Err:ErrTimeout}
				}
			}
		}
	}(input, quit)
	return quit
}