'What approach is best for concurrent producers and consumers?

I have a use case where I want to have a pool of N integers (0 - N-1) shared by N workers (N <= 100), each claiming an integer from the pool, "working" (for this example sleeping for a random duration), and returning them to the pool, and starting the process again. Each thread can take an arbitrary amount of time to return the key. I've quickly thrown together the following 2 solutions, and would like to know if there's a "best" or "safest" one, and if I'm missing a better approach. For the moment, these workers will never stop unless the application is killed, and we will have a fixed number of workers for the life of the application.

Single Buffered Channel

type Worker struct {
    ID       int
    KeyIndex int
    KeyChan  chan int
}

func (w *Worker) GetKey() {
    w.KeyIndex = <- w.KeyChan
}

func (w *Worker) ReturnKey() {
    w.KeyChan <- w.KeyIndex
}

func (w *Worker) Work() {
    for {
        w.GetKey()
        rand.Seed(time.Now().UnixNano())
        n := rand.Intn(10)
        time.Sleep(time.Duration(n) * time.Second)
        w.ReturnKey()
    }
}

func main() {
    numWorkers := 5

    c := make(chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        c <- i
    }

    workers := make([]*Worker, numWorkers)

    for i := range workers {
        workers[i] = &Worker{
            ID:      i,
            KeyChan: c,
        }
    }

    for _, w := range workers {
        go w.Work()
    }


    ch := make(chan byte, 1)
    <-ch
}

Broker w/ Array + Mutex

type KeyBrokerMutex struct {
    mu   sync.Mutex
    keys []bool
}

func (kb *KeyBrokerMutex) GetKey() int {
    kb.mu.Lock()
    defer kb.mu.Unlock()

    for i, k := range kb.keys {
        if k {
            kb.keys[i] = false
            return i
        }
    }

    return -1
}

func (kb *KeyBrokerMutex) ReturnKey(index int) {
    kb.mu.Lock()
    defer kb.mu.Unlock()
    kb.keys[index] = true
}

type Worker struct {
    ID       int
    KeyIndex int
    KeyBroker  *KeyBrokerMutex
}

func (w *Worker) GetKeyBrokerMutex() {
    w.KeyIndex = w.KeyPool.GetKey()
}

func (w *Worker) ReturnKeyBrokerMutex() {
    w.KeyPool.ReturnKey(w.KeyIndex)
    w.KeyIndex = -1
}

func (w *Worker) WorkMutex() {
    for {
        w.GetKeyBrokerMutex()
        rand.Seed(time.Now().UnixNano())
        n := rand.Intn(10)
        time.Sleep(time.Duration(n) * time.Second)
        w.ReturnKeyBrokerMutex()
    }
}

func main() {
    numWorkers := 5

    keyBroker := KeyBrokerMutex{keys: make([]bool, numWorkers)}
    for i := range keyBroker.keys {
        keyBroker.keys[i] = true
    }

    workers := make([]*Worker, numWorkers)

    for i := range workers {
        workers[i] = &Worker{
            ID:      i,
            KeyBroker: &keyBroker,
        }
    }

    for _, w := range workers {
        go w.WorkMutex()
    }

    ch := make(chan byte, 1)
    <-ch
}

I also have a broker approach using 2 separate channels for getting and returning keys, however I don't think that offers any benefits over the above solutions.

I like the simplicity of the single channel approach, but is there any downside to having multiple consumers and producers to a single buffered channel?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source