'ElasticSearch connection errors using goroutines

I have a syncing utility in Go, that moves data from MongoDB to ES. Here is the gist of it. Create workgroup, launch a goroutine listening on a channel, pull results from Mongo and publish to a channel which will index it to ES using goroutines.

wg := &sync.WaitGroup{}
go func() {
    for {
        select {
        case data := <-resultFeed:
            go func() {
                defer wg.Done()
                pushToES(data)
            }()
        }
    }
}()
... pull from Mongo
for {
    wg.Add(1)
    resultFeed <- row
}
...
wg.Wait()

Using the official ES client or olivere, the issue is that when using a goroutine to push the results to ES I get connection errors like:

no available connection: no Elasticsearch node available
read tcp 127.0.0.1:52778->127.0.0.1:9200: read: connection reset by peer
write tcp 127.0.0.1:54947->127.0.0.1:9200: write: broken pipe

Those come intermittently as some results will succeed then some will produce an error. Both ES and Mongo are default local installations. There are no logs produced from ES.

If I dont use a goroutine there are no errors but its obviously much slower and I chose Go specifically for the ability to index concurrently. I haven't tried using bulk requests because it is not only the indexing thats happening so I'd like to keep it per document.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source