'Tensorflow data pipeline stops working during training

I made a Tensorflow pipeline for loading numpy arrays (video data shape (40,160,160,3)). However, it stops working after loading the first x batches.

The problem is solved when removing num_parallel_calls=AUTOTUNE. However, if I do this, the training becomes significantly slower (ETA/epoch ~30 min -> ETA/epoch ~ 4 hours) . Is there a way to load the numpy arrays in parallel (or apply num_parallel_calls=AUTOTUNE) without any problems?

def get_label(file_path):
    import os
    parts = tf.strings.split(file_path, os.path.sep)
    
    return parts[-2]

def process_video(file_path):
    label = get_label(file_path)
    video = np.load(file_path, allow_pickle=True)

    return np.float32(video/255), np.float32(label)

def set_shape(video, label):
  video.set_shape((40,160,160, 3))
  label.set_shape([])

  return video, label


## Data pipeline
AUTOTUNE = tf.data.experimental.AUTOTUNE

train_ds = tf.data.Dataset.list_files("path/train/*/*",shuffle=True)

train_ds = train_ds.map(lambda item: tf.numpy_function(
          process_video, [item], (tf.float32, tf.float32)) ,num_parallel_calls=AUTOTUNE)       

train_ds = train_ds.map(set_shape)
train_ds = train_ds.batch(8)
train_ds = train_ds.prefetch(AUTOTUNE)

## Model
def create_LRCN_model():
 
    model = Sequential()
    
    model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu'),
                              input_shape = (40, 160, 160, 3)))
    
    model.add(TimeDistributed(MaxPooling2D((4, 4)))) 
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((4, 4))))
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    model.add(TimeDistributed(Dropout(0.25)))
    
    model.add(TimeDistributed(Conv2D(32, (3, 3), padding='same',activation = 'relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    #model.add(TimeDistributed(Dropout(0.25)))
                                      
    model.add(TimeDistributed(Flatten()))
                                      
    model.add(LSTM(32))
                                      
    model.add(Dense(1, activation = 'sigmoid'))
  
    model.summary()
    
    return model

LRCN_model = create_LRCN_model()

early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 15, mode = 'min', restore_best_weights = True)

LRCN_model.compile(loss='binary_crossentropy', optimizer = 'Adam', metrics = ["accuracy"])

LRCN_model_training_history = LRCN_model.fit(train_ds, validation_data= val_ds, epochs = 70,   callbacks = [early_stopping_callback]) #class_weight= class_weights,


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source