'PyTorch time conditional autoencoder

Goal

I have some timeseries data where for each instance of a series (hereafter referred to as its "name") has a variable number of timepoints and these points may not be consecutive (see section Dummy Data). Ideally, I would like to use an autoencoder to reduce these variable time series data into a singular hidden instance. I am struggling how to conceptually do this while conditioning on time, e.g. pseudo code for the forward call might be:

# NOTE: psuedo-code
class TimeSeriesCondtionalAE(nn.Module)
    # ...
    def forward(self, x, t):
        hidden = self.enc(x, t) # shape (1, n_hidden_features)
        decoded = self.dec(hidden)
        return hidden

Where the question is how to construct the encoder (self.enc) and the decoder (self.dec).

Dummy Data

dummy_data = [
    # n   t  features
    ['a', 1, 2, 3, 4, 5],
    ['a', 2, 2, 3, 4, 5],
    ['a', 3, 2, 3, 4, 5],

    ['b', 1, 5, 5, 3, 2],
    ['b', 3, 5, 5, 3, 1],
    ['b', 5, 5, 5, 3, 1], # <--- time not necessarily always consecutive

    ['c', 1, 1, 5, 7, 2],
    ['c', 2, 2, 5, 7, 1],
    ['c', 3, 3, 4, 7, 2],
    ['c', 4, 4, 4, 3, 1],
    ['c', 5, 5, 5, 3, 2],
    ['c', 6, 6, 5, 3, 1],
]

df = pd.DataFrame(
    dummy_data, 
    columns=['name', 'time', *'1 2 3 4'.split()]
).set_index('name')
MAX_LENGTH = df.time.max()

def get_name(name, max_time=MAX_LENGTH, device='cuda', one_indexed=True, pad=False):
    t_order = df.loc[name].time.values
    times = torch.tensor(df.loc[name].time.values, device=device)
    points = torch.tensor(df.loc[name].drop(columns='time').values, device=device)
    n_features = len(df.columns) - 1
    paired = list(zip(times, points))
    for i in range(max_time):
        t = i+1 if one_indexed else i
        if (i >= len(paired) or paired[i][0] != t) and pad:
            paired.insert(i, (torch.tensor(t, device=device), torch.zeros(n_features, device=device))) 
    return paired

Yields the following dataframe:

    time    1   2   3   4
name                    
a   1   2   3   4   5
a   2   2   3   4   5
a   3   2   3   4   5
b   1   5   5   3   2
b   2   5   5   3   1
b   2   5   5   3   1
c   1   1   5   7   2
c   2   2   5   7   1
c   3   3   4   7   2
c   4   4   4   3   1
c   5   5   5   3   2
c   6   6   5   3   1

where

current_name = get_name('a')
print(current_name)
[(tensor(1, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0')),
 (tensor(2, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0')),
 (tensor(3, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0'))]

Current code

Encoder:

class Encoder(nn.Module):
    def __init__(self, number_of_timepoints, number_of_features, hidden_size, hidden_layer_depth, latent_length, dropout = 0, block = 'LSTM', one_indexed=True):

        super(Encoder, self).__init__()
        self.number_of_timepoints = number_of_timepoints
        self.number_of_features = number_of_features
        self.hidden_size = hidden_size
        self.hidden_layer_depth = hidden_layer_depth
        self.latent_length = latent_length

        t = number_of_timepoints + 1 if one_indexed else number_of_timepoints
        self.embedding = nn.Embedding(t, number_of_features)

        if block == 'LSTM':
            self.model = nn.LSTM(self.number_of_features, self.hidden_size, self.hidden_layer_depth, dropout = dropout)
        elif block == 'GRU':
            self.model = nn.GRU(self.number_of_features, self.hidden_size, self.hidden_layer_depth, dropout = dropout)
        else:
            raise NotImplementedError

    def forward(self, t, x):
        """Forward propagation of encoder. Given input, outputs the last hidden state of encoder
        :param x: input to the encoder, of shape (sequence_length, batch_size, number_of_features)
        :return: last hidden state of encoder, of shape (batch_size, hidden_size)
        """
        

        # NOTE: using embedding layer for time
        # QUESTION: not sure how self.model takes in all (x, t) pairs
        embedded = self.embedding(t)
        output = (embedded + x).view(1,1,-1)
        _, (h_end, c_end) = self.model(output)

        h_end = h_end[-1, :, :]
        return h_end

Decoder

class Decoder(nn.Module):
    """Converts latent vector into output
    :param sequence_length: length of the input sequence
    :param batch_size: batch size of the input sequence
    :param hidden_size: hidden size of the RNN
    :param hidden_layer_depth: number of layers in RNN
    :param latent_length: latent vector length
    :param output_size: 2, one representing the mean, other log std dev of the output
    :param block: GRU/LSTM - use the same which you've used in the encoder
    :param dtype: Depending on cuda enabled/disabled, create the tensor
    """
    def __init__(self, sequence_length, batch_size, hidden_size, hidden_layer_depth, latent_length, output_size, dtype, block='LSTM', device='cuda'):

        super(Decoder, self).__init__()

        self.hidden_size = hidden_size
        self.batch_size = batch_size
        self.sequence_length = sequence_length
        self.hidden_layer_depth = hidden_layer_depth
        self.latent_length = latent_length
        self.output_size = output_size
        self.dtype = dtype

        if block == 'LSTM':
            self.model = nn.LSTM(1, self.hidden_size, self.hidden_layer_depth)
        elif block == 'GRU':
            self.model = nn.GRU(1, self.hidden_size, self.hidden_layer_depth)
        else:
            raise NotImplementedError

        self.latent_to_hidden = nn.Linear(self.latent_length, self.hidden_size)
        self.hidden_to_output = nn.Linear(self.hidden_size, self.output_size)

        self.decoder_inputs = torch.zeros(self.sequence_length, self.batch_size, 1, requires_grad=True, device=device).type(self.dtype)
        self.c_0 = torch.zeros(self.hidden_layer_depth, self.batch_size, self.hidden_size, requires_grad=True, device=device).type(self.dtype)

        nn.init.xavier_uniform_(self.latent_to_hidden.weight)
        nn.init.xavier_uniform_(self.hidden_to_output.weight)

    def forward(self, h_state):
        """Converts latent to hidden to output
        :param latent: latent vector
        :return: outputs consisting of mean and std dev of vector
        """

        if isinstance(self.model, nn.LSTM):
            h_0 = torch.stack([h_state for _ in range(self.hidden_layer_depth)])
            decoder_output, _ = self.model(self.decoder_inputs, (h_0, self.c_0))
        elif isinstance(self.model, nn.GRU):
            h_0 = torch.stack([h_state for _ in range(self.hidden_layer_depth)])
            decoder_output, _ = self.model(self.decoder_inputs, h_0)
        else:
            raise NotImplementedError

        out = self.hidden_to_output(decoder_output)
        return out

not sure what is going on.

Would appreciate the help, thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source