'PyTorch time conditional autoencoder
Goal
I have some timeseries data where for each instance of a series (hereafter referred to as its "name") has a variable number of timepoints and these points may not be consecutive (see section Dummy Data). Ideally, I would like to use an autoencoder to reduce these variable time series data into a singular hidden instance. I am struggling how to conceptually do this while conditioning on time, e.g. pseudo code for the forward call might be:
# NOTE: psuedo-code
class TimeSeriesCondtionalAE(nn.Module)
# ...
def forward(self, x, t):
hidden = self.enc(x, t) # shape (1, n_hidden_features)
decoded = self.dec(hidden)
return hidden
Where the question is how to construct the encoder (self.enc) and the decoder (self.dec).
Dummy Data
dummy_data = [
# n t features
['a', 1, 2, 3, 4, 5],
['a', 2, 2, 3, 4, 5],
['a', 3, 2, 3, 4, 5],
['b', 1, 5, 5, 3, 2],
['b', 3, 5, 5, 3, 1],
['b', 5, 5, 5, 3, 1], # <--- time not necessarily always consecutive
['c', 1, 1, 5, 7, 2],
['c', 2, 2, 5, 7, 1],
['c', 3, 3, 4, 7, 2],
['c', 4, 4, 4, 3, 1],
['c', 5, 5, 5, 3, 2],
['c', 6, 6, 5, 3, 1],
]
df = pd.DataFrame(
dummy_data,
columns=['name', 'time', *'1 2 3 4'.split()]
).set_index('name')
MAX_LENGTH = df.time.max()
def get_name(name, max_time=MAX_LENGTH, device='cuda', one_indexed=True, pad=False):
t_order = df.loc[name].time.values
times = torch.tensor(df.loc[name].time.values, device=device)
points = torch.tensor(df.loc[name].drop(columns='time').values, device=device)
n_features = len(df.columns) - 1
paired = list(zip(times, points))
for i in range(max_time):
t = i+1 if one_indexed else i
if (i >= len(paired) or paired[i][0] != t) and pad:
paired.insert(i, (torch.tensor(t, device=device), torch.zeros(n_features, device=device)))
return paired
Yields the following dataframe:
time 1 2 3 4
name
a 1 2 3 4 5
a 2 2 3 4 5
a 3 2 3 4 5
b 1 5 5 3 2
b 2 5 5 3 1
b 2 5 5 3 1
c 1 1 5 7 2
c 2 2 5 7 1
c 3 3 4 7 2
c 4 4 4 3 1
c 5 5 5 3 2
c 6 6 5 3 1
where
current_name = get_name('a')
print(current_name)
[(tensor(1, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0')),
(tensor(2, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0')),
(tensor(3, device='cuda:0'), tensor([2, 3, 4, 5], device='cuda:0'))]
Current code
Encoder:
class Encoder(nn.Module):
def __init__(self, number_of_timepoints, number_of_features, hidden_size, hidden_layer_depth, latent_length, dropout = 0, block = 'LSTM', one_indexed=True):
super(Encoder, self).__init__()
self.number_of_timepoints = number_of_timepoints
self.number_of_features = number_of_features
self.hidden_size = hidden_size
self.hidden_layer_depth = hidden_layer_depth
self.latent_length = latent_length
t = number_of_timepoints + 1 if one_indexed else number_of_timepoints
self.embedding = nn.Embedding(t, number_of_features)
if block == 'LSTM':
self.model = nn.LSTM(self.number_of_features, self.hidden_size, self.hidden_layer_depth, dropout = dropout)
elif block == 'GRU':
self.model = nn.GRU(self.number_of_features, self.hidden_size, self.hidden_layer_depth, dropout = dropout)
else:
raise NotImplementedError
def forward(self, t, x):
"""Forward propagation of encoder. Given input, outputs the last hidden state of encoder
:param x: input to the encoder, of shape (sequence_length, batch_size, number_of_features)
:return: last hidden state of encoder, of shape (batch_size, hidden_size)
"""
# NOTE: using embedding layer for time
# QUESTION: not sure how self.model takes in all (x, t) pairs
embedded = self.embedding(t)
output = (embedded + x).view(1,1,-1)
_, (h_end, c_end) = self.model(output)
h_end = h_end[-1, :, :]
return h_end
Decoder
class Decoder(nn.Module):
"""Converts latent vector into output
:param sequence_length: length of the input sequence
:param batch_size: batch size of the input sequence
:param hidden_size: hidden size of the RNN
:param hidden_layer_depth: number of layers in RNN
:param latent_length: latent vector length
:param output_size: 2, one representing the mean, other log std dev of the output
:param block: GRU/LSTM - use the same which you've used in the encoder
:param dtype: Depending on cuda enabled/disabled, create the tensor
"""
def __init__(self, sequence_length, batch_size, hidden_size, hidden_layer_depth, latent_length, output_size, dtype, block='LSTM', device='cuda'):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.batch_size = batch_size
self.sequence_length = sequence_length
self.hidden_layer_depth = hidden_layer_depth
self.latent_length = latent_length
self.output_size = output_size
self.dtype = dtype
if block == 'LSTM':
self.model = nn.LSTM(1, self.hidden_size, self.hidden_layer_depth)
elif block == 'GRU':
self.model = nn.GRU(1, self.hidden_size, self.hidden_layer_depth)
else:
raise NotImplementedError
self.latent_to_hidden = nn.Linear(self.latent_length, self.hidden_size)
self.hidden_to_output = nn.Linear(self.hidden_size, self.output_size)
self.decoder_inputs = torch.zeros(self.sequence_length, self.batch_size, 1, requires_grad=True, device=device).type(self.dtype)
self.c_0 = torch.zeros(self.hidden_layer_depth, self.batch_size, self.hidden_size, requires_grad=True, device=device).type(self.dtype)
nn.init.xavier_uniform_(self.latent_to_hidden.weight)
nn.init.xavier_uniform_(self.hidden_to_output.weight)
def forward(self, h_state):
"""Converts latent to hidden to output
:param latent: latent vector
:return: outputs consisting of mean and std dev of vector
"""
if isinstance(self.model, nn.LSTM):
h_0 = torch.stack([h_state for _ in range(self.hidden_layer_depth)])
decoder_output, _ = self.model(self.decoder_inputs, (h_0, self.c_0))
elif isinstance(self.model, nn.GRU):
h_0 = torch.stack([h_state for _ in range(self.hidden_layer_depth)])
decoder_output, _ = self.model(self.decoder_inputs, h_0)
else:
raise NotImplementedError
out = self.hidden_to_output(decoder_output)
return out
not sure what is going on.
Would appreciate the help, thanks.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
