'How to determine which goroutine is blocking execution?

all.

I have a small parser that writes found data to Postgres, as database framework I use https://github.com/jackc/pgx.

I write parsed data to an unbuffered channel from various goroutines.

I have special goroutine where I read data from this channel and write it to the database.

I'm debugging an application and it hangs forever sometime after (perhaps waiting for a free connection to a database in the pool).

How to determine which goroutine is blocking execution?

I've heard that there is a pprof, but I never used it. Thanks.

minimal example: I've struct like this

ParsingResults struct {
    parser  DataParser
    data []*common.Data
    err     error
}

in separate goroutine I init unbuffered channel like this:

results = make(chan *ParsingResults)

then I start various goroutines, where I run parsers:

go fetcher.Parse(results)

each parser gathers data and passes it to the channel like this:

var (
    results chan<- *ParsingResults
    pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
    return
}

time.Sleep(p.provider.DelayBetweenPages)

and in a separate goroutine such a function is launched:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
    for {
        select {
        case results := <-fetcher.resultsChannel:
            provider := results.parser.GetProvider()
            if results.err != nil {
                common.Logger.Errorw("failed to fetch data from provider",
                    "provider", provider.Url,
                    "error", results.err)
                continue
            }
            data := fetcher.removeDuplicates(results.data)
            common.Logger.Infow("fetched some data",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
            _, err := fetcher.Repo.SaveFetchedData(ctx, data)
            if err != nil {
                common.Logger.Errorw("failed to save fetched data",
                    "provider", provider.Url,
                    "error", err)
                continue
            }
            common.Logger.Infow("fetched data were saved successfully",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
        case <-ctx.Done():
            return
        default:
            common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
        }
    }
}

the data is stored in the database in this function:

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
    if len(rows) == 0 {
        return 0, nil
    }

    baseQB := sq.Insert(db.DataTableName).
        Columns(saveFetchedDataCols...).
        PlaceholderFormat(sq.Dollar)

    batch := &pgx.Batch{}
    for _, p := range rows {
        curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
        curQuery, curArgs, err := curQB.ToSql()

        if err != nil {
            return 0, fmt.Errorf("failed to generate SQL query: %w", err)
        }
        batch.Queue(curQuery, curArgs...)
    }

    br := repo.pool.SendBatch(ctx, batch)
    ct, err := br.Exec()
    if err != nil {
        return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
    }

    return ct.RowsAffected(), nil
}


Solution 1:[1]

I checked out full goroutine stack in pprof. So the error was that I did not release the connection from the pool after processing the result of the batch request. Therefore, 10 requests passed, the pool was completely filled and the execution thread was blocked. Guys, y'all are the best. Thanks for the help.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Oleg Shokin