'Reading Parquet objects in AWS S3 from node.js

I need to load and interpret Parquet files from an S3 bucket using node.js. I've already tried parquetjs-lite and other npm libraries I could find, but none of them seems to interpret date-time fields correctly. So I'm trying to AWS's own SDK instead, in the believe that is should be able to deserialize its own Parquet format correctly -- the objects were originally written from SageMaker.

The way to go about it, apparently, is to use the JS version of https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html but the documentation for that is horrifically out of date (it's referring to the 2006 API, https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#selectObjectContent-property). Likewise, the example they show in their blog post doesn't work either (data.Payload is neither a ReadableStream not iterable).

I've already tried the response in Javascript - Read parquet data (with snappy compression) from AWS s3 bucket. Neither of them work: the first uses node-parquet, which doesn't currently compile, and the second uses parquetjs-lite (which doesn't work, see above).

So my question is, how is SelectObjectContent supposed to work nowadays, i.e., using aws-sdk v3?

import { S3Client, ListBucketsCommand, GetObjectCommand,
SelectObjectContentCommand } from "@aws-sdk/client-s3";
const REGION = "us-west-2";
const s3Client = new S3Client({ region: REGION });

const params = {
  Bucket: "my-bucket-name",
  Key: "mykey",
  ExpressionType: 'SQL',
  Expression: 'SELECT created_at FROM S3Object',
    InputSerialization: {
        Parquet: {}
    },
    OutputSerialization: {
        CSV: {}
    }
};


const run = async () => {
  try {
    const data = await s3Client.send(new SelectObjectContentCommand(params));
    console.log("Success", data);

    const events = data.Payload;
    const eventStream = data.Payload;

    // Read events as they are available
    eventStream.on('data', (event) => {  // <--- This fails
      if (event.Records) {
        // event.Records.Payload is a buffer containing
        // a single record, partial records, or multiple records
        process.stdout.write(event.Records.Payload.toString());
      } else if (event.Stats) {
        console.log(`Processed ${event.Stats.Details.BytesProcessed} bytes`);
      } else if (event.End) {
        console.log('SelectObjectContent completed');
      }
    });

    // Handle errors encountered during the API call
    eventStream.on('error', (err) => {
      switch (err.name) {
        // Check against specific error codes that need custom handling
      }
    });

    eventStream.on('end', () => {
      // Finished receiving events from S3
    });
  } catch (err) {
    console.log("Error", err);
  }
};
run();

The console.log shows data.Payload as:

  Payload: {
    [Symbol(Symbol.asyncIterator)]: [AsyncGeneratorFunction: [Symbol.asyncIterator]]
  }

what should I do with that?



Solution 1:[1]

I was stuck on this exact same issue for quite some time. It looks like the best option now is to append a promise() to it.

So far, I've made progress using the following (sorry, this is incomplete but should at least enable you to read data):

    try {
        const s3Data = await s3.selectObjectContent(params3).promise();
        
        // using 'any' here temporarily, but will need to address type issues
        const events: any = s3Data.Payload;
        for await (const event of events) {
            try {
                if(event?.Records) {
                    if (event?.Records?.Payload) {
                        const record = decodeURIComponent(event.Records.Payload.toString().replace(/\+|\t/g, ' '));
                        records.push(record);
                    } else {
                        console.log('skipped event, payload: ', event?.Records?.Payload);
                    }
                }
                else if (event.Stats) {
                    console.log(`Processed ${event.Stats.Details.BytesProcessed} bytes`);
                } else if (event.End) {
                    console.log('SelectObjectContent completed');
                }
            }
            catch (err) {
                if (err instanceof TypeError) {
                    console.log('error in events: ', err);
                    throw err;
                }
            }
        }
    }
    catch (err) {
        console.log('error fetching data: ', err);
        throw err;
    }
    console.log("final records: ", records);
    return records;
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 0llyds