'Tensorflow slow processing with Generator
I have a peculiar case of slow model training while trying to train using a generator. The reason I need to use a generator is because I have multiple parquet files that cannot be loaded into memory at once. Here is the code snippet without a generator
d_df = pd.read_parquet("..")
label = pd_df.pop("label")
dataset = tf.data.Dataset.from_tensor_slices((dict(pd_df), label))
# alternate
# dataset = createDataset(bucket,prefix)
def is_test(x, y):
return x % 4 == 0
def is_train(x, y):
return not is_test(x, y)
recover = lambda x, y: y
val_dataset = dataset.enumerate() \
.filter(is_test) \
.map(recover).batch(batch_size)
train_dataset = dataset.enumerate() \
.filter(is_train) \
.map(recover).batch(batch_size)
feature_columns = _create_feature_columns()
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
model = tf.keras.Sequential([
feature_layer,
layers.Dense(1280, activation='relu'),
layers.Dense(512, activation='relu'),
layers.Dense(1280, activation='relu'),
layers.Dense(1)
])
model.compile(optimizer='adam',
loss=tf.keras.losses.MeanSquaredError(),
metrics=['accuracy', 'mean_absolute_error'])
om_model.fit(train_dataset, epochs=10, validation_data=val_dataset, verbose=1)
This runs with each steps 295ms. Naturally since its not possible to load all my data in one go I wrote the following generator ( P.S. I'm new to TF and my generator may be off but from what I could find online it looks good to me).
def getSplit(original_list, n):
return [original_list[i:i + n] for i in range(0, len(original_list), n)]
#
# 200 files -> 48 Mb (1 file)
# 15 files in memory at a time
# 5 generators
# 3 files per generator
#
def pandasGenerator(s3files, n=3):
print(f"Processing: {s3files} to : {tf.get_static_value(s3files)}")
s3files = tf.get_static_value(s3files)
s3files = [str(s3file)[2:-1] for s3file in s3files]
batches = getSplit(s3files, n)
for batch in batches:
t = time.process_time()
print(f"Processing Batch: {batch}")
panda_ds = pd.concat([pd.read_parquet(s3file) for s3file in batch], ignore_index=True)
elapsed_time = time.process_time() - t
print(f"base_read_time: {elapsed_time}")
for row in panda_ds.itertuples(index=False):
pan_row = dict(row._asdict())
labels = pan_row.pop('label')
yield dict(pan_row), labels
return
def createDS(s3bucket, s3prefix):
s3files = getFileLists(bucket=s3bucket, prefix=s3prefix)
dataset = (tf.data.Dataset.from_tensor_slices(getSplit(s3files, 40))
.interleave(
lambda files: tf.data.Dataset.from_generator(pandasGenerator, output_signature=(
{
}, tf.TensorSpec(shape=(), dtype=tf.float64)),
args=(files, 3)),
num_parallel_calls=tf.data.AUTOTUNE
)).prefetch(tf.data.AUTOTUNE)
return dataset
When using the generator the per step jumps to 2s.
I'd appreciate any help in improving the generator. Thanks.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
