'Recursive calls from function started as goroutine & Idiomatic way to continue caller when all worker goroutines finished
I am implementing a (sort of a) combinatorial backtracking algorithm in go utilising goroutines. My problem can be represented as a tree with a certain degree/spread where I want to visit each leaf and calculate a result depending on the path taken. On a given level, I want to spawn goroutines to process the subproblems concurrently, i.e. if I have a tree with degree 3 and I want to start the concurrency after level 2, I'd spawn 3*3=9 goroutines that proceed with processing the subproblems concurrently.
func main() {
cRes := make(chan string, 100)
res := []string{}
numLevels := 5
spread := 3
startConcurrencyAtLevel := 2
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)
for {
select {
case r := <-cRes:
res = append(res, r)
case <-time.After(10 * time.Second):
fmt.Println("Caculation timed out")
fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))
return
}
}
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {
if len(path) == maxLevels {
// some longer running task here associated with the found path, also using a lookup table
// real problem actually returns not the path but the result if it satisfies some condition
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
}
}
}
The above code works, however I rely on the for select statement timing out. I am looking for a way to continue with main() as soon as all goroutines have finished, i.e. all subproblems have been processed.
I already came up with two possible (unpreferred/unelegant) solutions:
Using a mutex protected result map + a waitgroup instead of a channel-based approach should do the trick, but I'm curious if there is a neat solution with channels.
Using a quit channel (of type int). Every time a goroutine is spawned, the quit channel gets a +1 int, everytime a comptutation finished in a leaf, it gets a -1 int and the caller sums up the values. See the following snippet, this however is not a good solution as it (rather blatantly) runs into timing issues I don't want to deal with. It quits prematurely if for instance the first goroutine finishes before another one has been spawned.
for {
select {
case q := <-cRunningRoutines:
runningRoutines += q
if runningRoutines == 0 {
fmt.Println("Calculation complete")
return res
}
// ...same cases as above
}
Playground: https://go.dev/play/p/9jzeCvl8Clj
Following questions:
- Is doing recursive calls from a function started as a goroutine to itself a valid approach?
- What would be an idiomatic way of reading the results from
cRes
until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.
Happy about any ideas, thanks!
Solution 1:[1]
As mentioned by torek, I spun off an anonymous function closing the channel after the waitgroup finished waiting. Also needed some logic around calling the wg.Done()
of the spawned goroutines only after the the recursion of the goroutine spawning level returns.
Generally I think this is a useful idiom (correct me if I'm wrong :))
Playground: https://go.dev/play/p/bQjHENsZL25
func main() {
cRes := make(chan string, 100)
numLevels := 3
spread := 3
startConcurrencyAtLevel := 2
var wg sync.WaitGroup
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)
go func() {
// time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
wg.Wait()
close(cRes)
}()
for r := range cRes {
fmt.Println(r)
}
fmt.Println("Done!")
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
if len(path) == maxLevels {
// some longer running task here associated with the found path
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
}
}
}
Solution 2:[2]
reading the description and the snippet I am not able to understand exactly what you are trying to achieve, but I have some hints and patterns for channels that I use daily and think are helpful.
the
context
package is very helpful to manage goroutines' state in a safe way. In your example,time.After
is used to end the main program, but in non-main functions it could be leaking goroutines: if instead you usecontext.Context
and pass it into the gorotuines (it's usually passed first arg of a function) you will be able to control cancellation of downstream calls. This explains it briefly.it is common practice to create channels (and return them) in functions that produce messages and send them in the channel. The same function should be responsible of closing the channel, e,g, with
defer close(channel)
when it's done writing. This is handy because when the channel is buffered, closing it is possible even when it still has data in it: the Go runtime will actually wait until all messages are polledbefore closing. For unbuffered channels, the function won't be able to send a message over the channel until a reader of the channel is ready to poll it, thus won;t be able to exit. This is an example (without recursion). We canclose
the channel both when it is buffered or unbuffered in this example, because the send will block until thefor := range
on the channel in the main goroutine reads from it. This is a variant for the same principle, with the channel passed as argument.we can use
sync.WaitGroup
in tandem with channels, to signal completion for individual goroutines, and let know to an "orchestrating" goroutine that the channel can be closed, because all message producers are done sending data into the channel. The same considerations as point 1 apply on theclose
operation. This is an example showing the use of waitGroup and external closer of channel.channels can have a direction! notice that in the example, I added/removed arrows next to the channel (e.g.
<-chan string
, orchan<- string
) when passing them in/outside functions. This tells the compiler that a channel is read or write only respectively in the scope of that function. This is helping in 2 ways:- the compiler will produce more efficient code, as the channels with direction will have a single lock instead of 2.
- the signature of the function describes if it will only use the channel for writing to it (and possibly
close()
) or reading: remember that reading from a channel with arange
automatically stops the iteration when the channel is closed.
you can build channels of channels:
make(chan chan string)
is a valid (and helpful) construct to build processing pipelines. A common usage of it is a fan-in goroutine that is collecting multiple outputs of a series of channel-producing goroutines. This is an example of how to use them.
In essence, to answer your initial questions:
Is doing recursive calls from a function started as a goroutine to itself a valid approach?
If you really need recursion, it's probably better to handle it separately from the concurrent code: create a dedicated function that recursively sends data into a channel, and orchestrate the closing of the channel in the caller.
What would be an idiomatic way of reading the results from cRes until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.
A good reference is Go Concurrency Patterns: Pipelines and cancellation: this is a rather old post (before the context
package existedin the std lib) and I think Parallel digestion
is what you're looking for to address the original question.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Francesco Gualazzi |