'Using SageMaker pipe mode with an s3 directory of tfrecords
My call to sagemaker.tensorflow.TensorFlow.fit() hangs indefinitely with no error message when I use Pipe instead of File as the input_mode. I correspondingly replace the TensorFlow Dataset with Pipemodedataset. The training in File mode completes successfully.
My data consists of two s3 buckets with multiple tfrecord files in each. Despite having looked extensively through the documentation, I am not confident about how to use the Pipemodedataset in this case - specifically, how to set the channel.
Here is my Sagemaker notebook setup:
hyperparameters = {
"batch-size": 1,
"pipe_mode": 1,
}
estimator_config = {
"entry_point": "tensorflow_train.py",
"source_dir": "source",
"framework_version": "2.3",
"py_version": "py37",
"instance_type": "ml.p3.2xlarge",
"instance_count": 1,
"role": sagemaker.get_execution_role(),
"hyperparameters": hyperparameters,
"output_path": f"s3://{bucket_name}",
"input_mode": "Pipe",
}
tf_estimator = TensorFlow(**estimator_config)
s3_data_channels = {
"training": f"s3://{bucket_name}/data/training",
"validation": f"s3://{bucket_name}/data/validation",
}
tf_estimator.fit(s3_data_channels)
If I were to run aws s3 ls on the s3_data_channels, I'd get a list of tfrecord files.
Here is the way I set up the dataset (see the if / else statement depending on whether pipe_mode is selected:
import tensorflow as tf
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
...
arg_parser.add_argument("--pipe_mode", type=int, default=0)
arg_parser.add_argument("--train_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))
arg_parser.add_argument(
"--validation_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION")
)
arg_parser.add_argument("--model_dir", type=str)
args, _ = arg_parser.parse_known_args()
AUTOTUNE = tf.data.experimental.AUTOTUNE
if args.pipe_mode == 1:
from sagemaker_tensorflow import PipeModeDataset
train_ds = PipeModeDataset(channel="training", record_format='TFRecord')
val_ds = PipeModeDataset(channel="validation", record_format='TFRecord')
else:
train_files = tf.data.Dataset.list_files(args.train_dir + '/*tfrecord')
val_files = tf.data.Dataset.list_files(args.validation_dir + '/*tfrecord')
train_ds = tf.data.TFRecordDataset(filenames=train_files, num_parallel_reads=AUTOTUNE)
val_ds = tf.data.TFRecordDataset(filenames=val_files, num_parallel_reads=AUTOTUNE)
train_ds = (
train_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
.batch(args.batch_size)
.prefetch(AUTOTUNE)
)
val_ds = (
val_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
.batch(args.batch_size)
.prefetch(AUTOTUNE)
)
...
Solution 1:[1]
I had the same problem, model.fit() got stuck indefinitely when using pipe mode. After some research and trying a many changes it got solved by defining steps_per_epoch when fitting the model.
I guess when using file mode it already knows how many steps per epoch there will be, but with pipe mode you have to specify it manually
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | AgustÃn Begue |
