'How to catch UnhandledPromiseRejectionWarning for GCS WriteStream

Observed Application Behavior

I'm getting a UnhandledPromiseRejectionWarning: Error: Upload failed when using @google-cloud/storage in node.js.

These errors come when processing thousands of requests. It's a small percentage that cause errors, but due to the lack of ability to handle the errors, and the lack of proper context from the error message, it's very difficult to determine WHICH files are failing.

I know in general promises must have a .catch or be surrounded by a try/catch block. But in this case I'm using a write stream. I'm a little bit confused as to where the promise that's being rejected is actually located and how I would intercept it. The stack trace is unhelpful, as it only contains library code:

UnhandledPromiseRejectionWarning: Error: Upload failed
    at Request.requestStream.on.resp (.../node_modules/gcs-resumable-upload/build/src/index.js:163:34)
    at emitTwo (events.js:131:20)
    at Request.emit (events.js:214:7)
    at Request.<anonymous> (.../node_modules/request/request.js:1161:10)
    at emitOne (events.js:121:20)
    at Request.emit (events.js:211:7)
    at IncomingMessage.<anonymous> (.../node_modules/request/request.js:1083:12)
    at Object.onceWrapper (events.js:313:30)
    at emitNone (events.js:111:20)
    at IncomingMessage.emit (events.js:208:7)

My Code

The code that's creating the writeStream looks like this:

const {join} = require('path')
const {Storage} = require('@google-cloud/storage')

module.exports = (config) => {
  const storage = new Storage({
    projectId: config.gcloud.project,
    keyFilename: config.gcloud.auth_file
  })

  return {
    getBucketWS(path, contentType) {
      const {bucket, path_prefix} = config.gcloud

      // add path_prefix if we have one
      if (path_prefix) {
        path = join(path_prefix, path)
      }

      let setup = storage.bucket(bucket).file(path)
      let opts = {}
      if (contentType) {
        opts = {
          contentType,
          metadata: {contentType}
        }
      }
      const stream = setup.createWriteStream(opts)
      stream._bucket = bucket
      stream._path = path
      return stream
    }
  }
}

And the consuming code looks like this:

const gcs = require('./gcs-helper.js')

module.exports = ({writePath, contentType, item}, done) => {
  let ws = gcs.getBucketWS(writePath, contentType)
  ws.on('error', (err) => {
    err.message = `Could not open gs://${ws._bucket}/${ws._path}: ${err.message}`
    done(err)
  })
  ws.on('finish', () => {
    done(null, {
      path: writePath,
      item
    })
  })
  ws.write(item)
  ws.end()
}

Given that I'm already listening for the error event on the stream, I don't see what else I can do here. There isn't a promise happening at the level of @google-cloud/storage that I'm consuming.

Digging into the @google-cloud/storage Library

The first line of the stack trace brings us to a code block in the gcs-resumable-upload node module that looks like this:

requestStream.on('complete', resp => {
    if (resp.statusCode < 200 || resp.statusCode > 299) {
        this.destroy(new Error('Upload failed'));
        return;
    }
    this.emit('metadata', resp.body);
    this.deleteConfig();
    this.uncork();
});

This is passing the error to the destroy method on the stream. The stream is being created by the @google-cloud/common project's utility module, and this is using the duplexify node module to create the stream. The destroy method is defined on the duplexify stream and can be found in the README documentation.

Reading the duplexify code, I see that it first checks this._ondrain before emitting an error. Maybe I can provide a callback to avoid this error being unhandled?

I tried ws.write(item, null, cb) and still got the same UnhandledPromiseRejectionWarning. I tried ws.end(item, null, cb) and even wrapped the .end call in a try catch, and ended up getting this error which crashed the process entirely:

events.js:183
      throw er; // Unhandled 'error' event
      ^

Error: The uploaded data did not match the data from the server. As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.
    at delete (.../node_modules/@google-cloud/storage/build/src/file.js:1295:35)
    at Util.handleResp (.../node_modules/@google-cloud/common/build/src/util.js:123:9)
    at retryRequest (.../node_modules/@google-cloud/common/build/src/util.js:404:22)
    at onResponse (.../node_modules/retry-request/index.js:200:7)
    at .../node_modules/teeny-request/build/src/index.js:208:17
    at <anonymous>
    at process._tickCallback (internal/process/next_tick.js:189:7)

My final code looks something like this:

let ws = gcs.getBucketWS(writePath, contentType)
const handleErr = (err) => {
  if (err) err.message = `Could not open gs://${ws._bucket}/${ws._path}: ${err.message}`
  done(err)
}
ws.on('error', handleErr)
// trying to do everything we can to handle these errors
// for some reason we still get UnhandledPromiseRejectionWarning
try {
  ws.write(item, null, err => {
    handleErr(err)
  })
  ws.end()
} catch (e) {
  handleErr(e)
}

Conclusion

It's still a mystery to me how a user of the @google-cloud/storage library, or duplexify for that matter, is supposed to perform proper error handling. Comments from library maintainers of either project would be appreciated. Thanks!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source