'Applying Multithreading to Neural Network Training

I'm working on a logistic regression machine learning project and utilized the code from https://builtin.com/data-science/guide-logistic-regression-tensorflow-20 as a starting point.

My goal is to divide the data into batches for training and train them all at the same time.

This is the solution I came up with. Is it doing its job properly?

from __future__ import absolute_import, division, print_function
import tensorflow as tf
import numpy as np
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import threading
import time
from concurrent.futures import ThreadPoolExecutor

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = np.array(x_train, np.float32), np.array(x_test, np.float32)
x_train, x_test = x_train.reshape([-1, 784]), x_test.reshape([-1, 784])
x_train, x_test = x_train / 255., x_test / 255.
num_classes = 10 # 0 to 9 digits
num_features = 784 # 28*28
# Training parameters.
learning_rate = 0.01
training_steps = 1000
batch_size = 256 
#The batch size defines the number of samples that will be propagated through the network.
train_data=tf.data.Dataset.from_tensor_slices((x_train,y_train))
train_data=train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)
W = tf.Variable(tf.ones([num_features, num_classes]), name="weight")
b = tf.Variable(tf.zeros([num_classes]), name="bias")
def logistic_regression(x):

    # Apply softmax to normalize the logits to a probability distribution.

    return tf.nn.softmax(tf.matmul(x, W) + b)

def cross_entropy(y_pred, y_true):

    # Encode label to a one hot vector.

    y_true = tf.one_hot(y_true, depth=num_classes)

    # Clip prediction values to avoid log(0) error.

    y_pred = tf.clip_by_value(y_pred, 1e-9, 1.)

    # Compute cross-entropy.

    return tf.reduce_mean(-tf.reduce_sum(y_true * tf.math.log(y_pred)))

def accuracy(y_pred, y_true):

    # Predicted class is the index of the highest score in prediction vector (i.e. argmax).

    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))

    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
optimizer = tf.optimizers.SGD(learning_rate)

def run_optimization1(x, y):
# Wrap computation inside a GradientTape for automatic differentiation.
    with tf.GradientTape() as g:
        pred = logistic_regression(x)
        loss = cross_entropy(pred, y)
        acc = accuracy(pred,y)
    # Compute gradients.
    gradients = g.gradient(loss, [W, b])
    return gradients

for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
    x1, x2 = tf.split(batch_x, num_or_size_splits=2)
    y1, y2 = tf.split(batch_y, num_or_size_splits=2)
    with ThreadPoolExecutor(max_workers=2) as executor:
        f1=executor.submit(run_optimization1, x1,y1)
        f2=executor.submit(run_optimization1,x2,y2)
    gradients = f1.result()+f2.result()
    optimizer.apply_gradients(zip(gradients, [W, b]))
    if step % 50 == 0:
        

        pred3 = logistic_regression(batch_x)

        loss3 = cross_entropy(pred3, batch_y)

        acc3 = accuracy(pred3, batch_y)
       
        print("step: %i, loss: %f, accuracy: %f" % (step, loss3, acc3))


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source