'Reading Parquet objects in AWS S3 from node.js
I need to load and interpret Parquet files from an S3 bucket using node.js. I've already tried parquetjs-lite and other npm libraries I could find, but none of them seems to interpret date-time fields correctly. So I'm trying to AWS's own SDK instead, in the believe that is should be able to deserialize its own Parquet format correctly -- the objects were originally written from SageMaker.
The way to go about it, apparently, is to use the JS version of
https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html
but the documentation for that is horrifically out of date (it's referring to the 2006 API, https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#selectObjectContent-property). Likewise, the example they show in their blog post doesn't work either (data.Payload is neither a ReadableStream not iterable).
I've already tried the response in Javascript - Read parquet data (with snappy compression) from AWS s3 bucket. Neither of them work: the first uses node-parquet, which doesn't currently compile, and the second uses parquetjs-lite (which doesn't work, see above).
So my question is, how is SelectObjectContent supposed to work nowadays, i.e., using aws-sdk v3?
import { S3Client, ListBucketsCommand, GetObjectCommand,
SelectObjectContentCommand } from "@aws-sdk/client-s3";
const REGION = "us-west-2";
const s3Client = new S3Client({ region: REGION });
const params = {
Bucket: "my-bucket-name",
Key: "mykey",
ExpressionType: 'SQL',
Expression: 'SELECT created_at FROM S3Object',
InputSerialization: {
Parquet: {}
},
OutputSerialization: {
CSV: {}
}
};
const run = async () => {
try {
const data = await s3Client.send(new SelectObjectContentCommand(params));
console.log("Success", data);
const events = data.Payload;
const eventStream = data.Payload;
// Read events as they are available
eventStream.on('data', (event) => { // <--- This fails
if (event.Records) {
// event.Records.Payload is a buffer containing
// a single record, partial records, or multiple records
process.stdout.write(event.Records.Payload.toString());
} else if (event.Stats) {
console.log(`Processed ${event.Stats.Details.BytesProcessed} bytes`);
} else if (event.End) {
console.log('SelectObjectContent completed');
}
});
// Handle errors encountered during the API call
eventStream.on('error', (err) => {
switch (err.name) {
// Check against specific error codes that need custom handling
}
});
eventStream.on('end', () => {
// Finished receiving events from S3
});
} catch (err) {
console.log("Error", err);
}
};
run();
The console.log shows data.Payload as:
Payload: {
[Symbol(Symbol.asyncIterator)]: [AsyncGeneratorFunction: [Symbol.asyncIterator]]
}
what should I do with that?
Solution 1:[1]
I was stuck on this exact same issue for quite some time. It looks like the best option now is to append a promise() to it.
So far, I've made progress using the following (sorry, this is incomplete but should at least enable you to read data):
try {
const s3Data = await s3.selectObjectContent(params3).promise();
// using 'any' here temporarily, but will need to address type issues
const events: any = s3Data.Payload;
for await (const event of events) {
try {
if(event?.Records) {
if (event?.Records?.Payload) {
const record = decodeURIComponent(event.Records.Payload.toString().replace(/\+|\t/g, ' '));
records.push(record);
} else {
console.log('skipped event, payload: ', event?.Records?.Payload);
}
}
else if (event.Stats) {
console.log(`Processed ${event.Stats.Details.BytesProcessed} bytes`);
} else if (event.End) {
console.log('SelectObjectContent completed');
}
}
catch (err) {
if (err instanceof TypeError) {
console.log('error in events: ', err);
throw err;
}
}
}
}
catch (err) {
console.log('error fetching data: ', err);
throw err;
}
console.log("final records: ", records);
return records;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | 0llyds |
