'Write millions of files to S3 during post-processing with AWS Sagemaker
I am having trouble with data post-processing in AWS Sagemaker, where I need to split one large text file with predictions (~2-10 GB) into millions of small files (one file per user ~3-10KB).
Both the source and target files are stored in S3. The output files will be served to end-users using AWS API Gateway + AWS Lambda.
Jupyter notebook:
import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig
role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'
input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'
network_config = NetworkConfig(enable_network_isolation=False,
subnets=["subnet-12345", "subnet-67890"],
security_group_ids=["sg-0123456789"])
script_processor = ScriptProcessor(role=role,
image_uri=ecr_image_full_name,
command=['python3'],
instance_count=1,
instance_type=instance_type)
input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)
script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
Dockerfile:
FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE
Inside callable.py I preprocess the input file and put the result in /opt/ml/processing/output, e.g.:
/opt/ml/processing/output/93faa_654321010000007_latest.json
The resulting file will be saved to S3 by ScriptProcessor:
s3://my-awesome-results/93faa_654321010000007_latest.json
The input file is just a | delimited csv file, e.g.
654321010000007|1288858|AB|1
654321010000008|1266069|AB|2
654321010000009|0956486|AB|3
654321010000010|1295930|AB|4
654321010000011|0594956|AB|5
654321010000012|1231767|AB|6
654321010000013|1273878|CD|7
654321010000014|1295236|AB|8
654321010000015|1255404|AB|9
The resulting file would look like this (so basically callable.py will just iterate over each line and convert it to a json string):
{"id": 654321010000007, "article": 1288858, "type": "AB", "rank": 1}
callable.py
import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd
def saveFilesMultiProcesses(items):
with ProcessPoolExecutor() as executor:
for item in items:
executor.submit(saveFile, item)
def readCsv(input_file):
colnames = ['id', 'article', 'type', 'rank']
df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
return df
def processCsv(df):
dicts = []
for row in df.itertuples():
dict = defaultdict(lambda: defaultdict(list))
dict["id"] = row.id
dict["article"] = row.article
dict["type"] = row.type
dict["rank"] = row.rank
dicts.append(dict)
return dicts
def saveFile(item):
hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
short = hashed_prefix[:5]
file_name = short + "_" + str(item['id']) + "_latest.json"
outfile = Path('/opt/ml/processing/output', file_name)
with open(outfile, 'w') as json_file:
json.dump(item, json_file)
if __name__ == '__main__':
input_file = sys.argv[1]
df = readCsv(input_file)
list_of_dicts = processCsv(df)
saveFilesMultiProcesses(list_of_dicts)
print("Done. Wait until all files are saved to S3")
I've been able to process a small dataset (32MB, 13540 records). When I try 1.2 million records (2.2 GB), ScriptProcessor successfully processes the input file and saves the output files to /opt/ml/processing/output, however it fails to put them in S3 with the following error:
---------------------------------------------------------------------------
UnexpectedStatusException Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
402 self.jobs.append(self.latest_job)
403 if wait:
--> 404 self.latest_job.wait(logs=logs)
405
406 def _get_user_code_name(self, code):
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
726 """
727 if logs:
--> 728 self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
729 else:
730 self.sagemaker_session.wait_for_processing_job(self.job_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
3132
3133 if wait:
-> 3134 self._check_job_status(job_name, description, "ProcessingJobStatus")
3135 if dot:
3136 print()
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
2636 ),
2637 allowed_statuses=["Completed", "Stopped"],
-> 2638 actual_status=status,
2639 )
2640
UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error. Please try again.
In the Sagemaker documentation it's written that the Processor class (and it's child class ScriptProcessor) is meant for data pre-processing, post-processing, feature engineering, data validation, and model evaluation. Apparently it's not meant for handling millions of files (put to S3).
Any ideas? Thanks in advance.
Solution 1:[1]
I also had a similar problem. The optimal solution for me was to use a Glue Job with Spark. But, I had also done a test, that worked to me, writing directly on s3 without using Processing Output
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | IlRicciardelli |
