'When I make Transformer for NLP, it doesn't work, how can I fix it?
def get_pad_mask(tokens, i_pad=0):
"""
pad mask 계산하는 함수
:param tokens: tokens (bs, n_seq)
:param i_pad: id of pad
:return mask: pad mask (pad: 1, other: 0)
"""
# pad: True, others: False
mask = torch.eq(tokens, i_pad)
# boolean -> float 32
mask = mask.type(torch.FloatTensor)
# expand dimension for Q n_seq
mask = torch.unsqueeze(mask, 1)
return mask
def get_causal_mask(tokens, i_pad=0):
"""
causal mask 계산하는 함수
:param tokens: tokens (bs, n_seq)
:param i_pad: id of pad
:return mask: causal and pad mask (causal or pad: 1, other: 0)
"""
# n_seq 조회
n_seq = tokens.shape[1]
# all one mask
mask = torch.ones((n_seq, n_seq))
# make reverse causal mask
mask = mask.triu(1)
# 0 -> 1, 1 -> 0
# expand dim for bs
mask = torch.unsqueeze(mask, 0)
# get pad_mask
pad_mask = get_pad_mask(tokens, i_pad)
# mask all causal_mask or pad_mask
mask = torch.maximum(mask, pad_mask)
return mask
class ScaleDotProductAttention(nn.Module):
"""
Scale Dot Product Attention Class
"""
def __init__(self, name="scale_dot_product_attention"):
"""
생성자
:param name: layer name
"""
super(ScaleDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
layer 실행
:param Q: Query
:param K: Key
:param V: Value
:param attn_mask: attention mask
:return attn_out: attention 실행 결과
"""
# matmul Q, K.T
attn_score = torch.matmul(Q, K.transpose(-2,-1))
# d_k
d_k = torch.tensor(K.shape[-1])
# scale = d_k ** 0.5
scale = torch.sqrt(d_k)
# divide by scale
attn_scale = torch.divide(attn_score, scale)
# do mask (subtract 1e-9 for masked value)
attn_scale -= 1.e9 * attn_mask
# calculate attention prob
attn_prob = torch.softmax(attn_scale, axis=-1)
# weighted sum of V
attn_out = torch.matmul(attn_prob, V)
return attn_out
class MultiHeadAttention(nn.Module):
#class MultiHeadAttention(tf.keras.layers.Layer):
"""
Multi Head Attention Class
"""
def __init__(self, args, name="MultiHeadAttention"):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(MultiHeadAttention, self).__init__()
self.d_model = args.d_model
self.n_head = args.n_head
self.d_head = args.d_head
# Q, K, V input dense layer
self.W_Q = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
self.W_K = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
self.W_V = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
'''TensorFLow
self.W_Q = tf.keras.layers.Dense(self.n_head * self.d_head)
self.W_K = tf.keras.layers.Dense(self.n_head * self.d_head)
self.W_V = tf.keras.layers.Dense(self.n_head * self.d_head)
TensorFLow'''
# Scale Dot Product Attention class
self.attention = ScaleDotProductAttention(name="self_attention")
# output dense layer
#self.W_O = torch.nn.Linear(args.n_head * args.d_head,self.d_model)
self.W_O = torch.nn.Linear(self.d_model,self.d_model)
'''TensorFLow
self.W_O = tf.keras.layers.Dense(self.d_model)
TensorFLow'''
def forward(self, Q, K, V, attn_mask):
"""
layer 실행
:param Q: Query
:param K: Key
:param V: Value
:param attn_mask: attention mask
:return attn_out: attention 실행 결과
"""
# build multihead Q, K, V
self.Q_m = torch.transpose(torch.reshape(self.W_Q(Q), [-1, Q.shape[1], args.n_head, args.d_head]), 2, 1) # (bs, n_head, Q_len, d_head)
self.K_m = torch.transpose(torch.reshape(self.W_K(K), [-1, K.shape[1], args.n_head, args.d_head]), 2, 1) # (bs, n_head, Q_len, d_head)
self.V_m = torch.transpose(torch.reshape(self.W_V(V), [-1, V.shape[1], args.n_head, args.d_head]), 2, 1) # (bs, n_head, Q_len, d_head)
'''TensorFLow
Q_m = tf.transpose(tf.reshape(self.W_Q(Q), [-1, tf.shape(Q)[1], args.n_head, args.d_head]), [0, 2, 1, 3]) # (bs, n_head, Q_len, d_head)
K_m = tf.transpose(tf.reshape(self.W_K(K), [-1, tf.shape(K)[1], args.n_head, args.d_head]), [0, 2, 1, 3]) # (bs, n_head, Q_len, d_head)
V_m = tf.transpose(tf.reshape(self.W_V(V), [-1, tf.shape(V)[1], args.n_head, args.d_head]), [0, 2, 1, 3]) # (bs, n_head, Q_len, d_head)
TensorFLow'''
# build multihead mask
attn_mask_m = torch.unsqueeze(attn_mask, axis=1)
'''TensorFLow
attn_mask_m = tf.expand_dims(attn_mask, axis=1)
TensorFLow'''
# Scale Dot Product Attention with multi head Q, K, V, attn_mask
attn_out_m = self.attention(self.Q_m, self.K_m, self.V_m, attn_mask_m) # (bs, n_head, Q_len, d_head)
# transpose
attn_out_t = torch.transpose(attn_out_m,2, 1)
'''TensorFLow
attn_out_t = tf.transpose(attn_out_m, perm=[0, 2, 1, 3]) # (bs, n_head, Q_len, d_head) -> (bs, Q_len, n_head, d_head)
TensorFLow'''
# reshape
attn_out_c = torch.reshape(attn_out_t, [-1, Q.shape[1], args.n_head * args.d_head]) # (bs, Q_len, n_head * d_head) -> (bs, Q_len, n_head, d_head)
'''TensorFLow
attn_out_c = tf.reshape(attn_out_t, [-1, tf.shape(Q)[1], args.n_head * args.d_head]) # (bs, Q_len, n_head, d_head) -> (bs, Q_len, n_head * d_head)
TensorFLow'''
# linear for output
attn_out = self.W_O(attn_out_c) # (bs, Q_len, n_head * d_head) -> (bs, Q_len, d_model)
return attn_out
class PositionWiseFeedForward(nn.Module):
#class PositionWiseFeedForward(tf.keras.layers.Layer):
"""
Position Wise Feed Forward Class
"""
def __init__(self, args, name="PositionWiseFeedForward"):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(PositionWiseFeedForward, self).__init__()
#super().__init__(name=name)
relu_f = torch.nn.ReLU()
relu_W = nn.Linear(args.d_model,args.d_ff)
self.W_1 = torch.nn.Sequential(relu_W,relu_f)
self.W_2 = nn.Linear(args.d_ff,args.d_model)
#self.W_1 = tf.keras.layers.Dense(args.d_ff, activation=tf.nn.relu)
#self.W_2 = tf.keras.layers.Dense(args.d_model)
def forward(self,inputs):
#def call(self, inputs):
"""
layer 실행
:param inputs: inputs
:return ff_val: feed forward 실행 결과
"""
# linear W_1 and W_2
ff_val = self.W_1(inputs)
ff_val = self.W_2(ff_val)
return ff_val
class EncoderLayer(nn.Module):
#class EncoderLayer(tf.keras.layers.Layer):
"""
Encoder Layer Class
"""
def __init__(self, args, name='encoder_layer'):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(EncoderLayer, self).__init__()
#super().__init__(name=name)
self.enc_x_size = hidden_enc.size(dim=1)
self.enc_y_size = hidden_enc.size(dim=2)
self.self_attention = MultiHeadAttention(args)
self.norm1 = nn.LayerNorm([self.enc_x_size,self.enc_y_size],eps=args.norm_eps)
self.ffn = PositionWiseFeedForward(args)
self.norm2 = nn.LayerNorm([self.enc_x_size,self.enc_y_size],eps=args.norm_eps)
self.dropout = nn.Dropout(args.dropout)
def forward(self, enc_hidden, self_mask, training):
"""
layer 실행
:param enc_hidden: 이전 layer 출력
:param self_mask: self attention mask
:param training: training flag
:return enc_out: EncoderLayer 실행 결과
"""
# self attention
if training == False :
self.dropout.p = 0.0 #drop out training = False
print('ENCODER')
print(self.enc_x_size,self.enc_y_size,self.dropout.p)
self_attn_val = self.self_attention(enc_hidden, enc_hidden, enc_hidden, self_mask)
# add and layer normal
norm1_val = self.norm1(enc_hidden + self.dropout(self_attn_val))
# feed forward
ffn_val = self.ffn(norm1_val)
# add and layer normal
enc_out = self.norm2(norm1_val + self.dropout(ffn_val))
self.dropout.p = args.dropout
return enc_out
class DecoderLayer(nn.Module):
#class DecoderLayer(tf.keras.layers.Layer):
"""
Decoder Layer Class
"""
def __init__(self, args, name='decoder_layer'):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(DecoderLayer, self).__init__()
#super().__init__(name=name)
self.dec_x_size = dec_hidden.size(dim=1)
self.dec_y_size = dec_hidden.size(dim=2)
self.self_attention = MultiHeadAttention(args)
self.norm1 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)
self.ende_attn = MultiHeadAttention(args)
self.norm2 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)
self.ffn = PositionWiseFeedForward(args)
self.norm3 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)
self.dropout = nn.Dropout(args.dropout)
def forward(self, dec_hidden, enc_out, self_mask, ende_mask,training):
#def call(self, dec_hidden, enc_out, self_mask, ende_mask, training):
"""
layer 실행
:param dec_hidden: 이전 layer 출력
:param enc_out: Encoder final 출력
:param self_mask: self attention mask
:param ende_mask: Encoder Decoder attention mask
:param training: training flag
:return dec_out: DecoderLayer 실행 결과
"""
print('DecoderLayer')
print(self.dec_x_size,self.dec_y_size)
if training == False :
self.dropout.p = 0.0 #drop out training = False
# self attention
self_attn_val = self.self_attention(dec_hidden, dec_hidden, dec_hidden, self_mask)
# add and layer normal
norm1_val = self.norm1(dec_hidden + self.dropout(self_attn_val))
# encoder and decoder attention
ende_attn_val = self.ende_attn(norm1_val, enc_out, enc_out, ende_mask)
# add and layer normal
norm2_val = self.norm2(norm1_val + self.dropout(ende_attn_val))
# feed forward
ffn_val = self.ffn(norm2_val)
# add and layer normal
dec_out = self.norm3(norm2_val + self.dropout(ffn_val))
self.dropout.p = args.dropout
return dec_out
class SharedEmbedding(nn.Module):
#class SharedEmbedding(tf.keras.layers.Layer):
"""
Weighed Shaed Embedding Class
"""
def __init__(self, args, name='SharedEmbedding'):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(SharedEmbedding, self).__init__()
#super().__init__(name=name)
self.shared_weights = torch.empty(args.n_vocab, args.d_model)
self.shared_weights = torch.nn.init.trunc_normal_(self.shared_weights,std = args.d_model ** -0.5)
#with tf.name_scope('shared_embedding_weight'):
self.n_vocab = args.n_vocab
self.d_model = args.d_model
def forward(self, inputs, mode='embedding'):
#def call(self, inputs, mode='embedding'):
"""
layer 실행
:param inputs: 입력
:param mode: 실행 모드
:return: embedding or linear 실행 결과
"""
# mode가 embedding일 경우 embedding lookup 실행
if mode == 'embedding':
return self._embedding(inputs)
# mode가 linear일 경우 linear 실행
elif mode == 'linear':
return self._linear(inputs)
# mode가 기타일 경우 오류 발생
else:
raise ValueError(f'mode {mode} is not valid.')
def _embedding(self, inputs):
"""
embedding lookup
:param inputs: 입력
"""
# lookup by gather
embed = torch.matmul(nn.functional.one_hot(inputs, len(vocab)).type(torch.float), self.shared_weights )
#embed = tf.gather(self.shared_weights, tf.cast(inputs, tf.int32))
# muliply d_model ** 0.5
embed *= self.d_model ** 0.5
return embed
def _linear(self, inputs): # (bs, n_seq, d_model)
"""
linear 실행
:param inputs: 입력
"""
# matmul inputs, shared_weights (transpose_b=True)
outputs = torch.matmul(inputs, torch.transpose(self.shared_weights, 1, 0))
#outputs = tf.matmul(inputs, self.shared_weights, transpose_b=True)
return outputs
class PositionalEmbedding(nn.Module):
#class PositionalEmbedding(tf.keras.layers.Layer):
"""
Positional Embedding Class
"""
def __init__(self, args, name='position_embedding'):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(PositionalEmbedding, self).__init__()
#super().__init__(name=name)
pos_encoding = PositionalEmbedding.get_sinusoid_encoding(args.n_seq, args.d_model)
self.embedding = nn.Embedding(args.n_seq, args.d_model)
self.embedding.weights = [pos_encoding] #weights=[pos_encoding]
self.embedding.weight.requires_grad=False #trainable=False
#self.embedding = tf.keras.layers.Embedding(args.n_seq, args.d_model, trainable=False, weights=[pos_encoding])
def forward(self, inputs):
#def call(self, inputs):
"""
layer 실행
:param inputs: 입력
:return embed: positional embedding lookup 결과
"""
# make position (0...n_seq)
zero_inputs = torch.ones_like(inputs)
x_size = zero_inputs.size(dim=0)
for i in range(0,x_size):
zero_inputs[i][0] = 0
position = torch.cumsum(zero_inputs, dim=1)
#position = tf.math.cumsum(tf.ones_like(inputs), axis=1, exclusive=True)
# embedding lookup
embed = self.embedding(position)
return embed
@staticmethod
def get_sinusoid_encoding(n_seq, d_model):
"""
sinusoid encoding 생성
:param n_seq: sequence number
:param n_seq: model hidden dimension
:return: positional encoding table
"""
# calculate exp
exs = np.array([2 * (i_ang // 2) / d_model for i_ang in range(d_model)])
# exs = np.array([2 * (i_ang // 2) / args.d_model for i_ang in range(args.d_model)])
# calculate power
angles = np.power(10000, exs)
# make position
pos = np.array([[i] for i in range(n_seq)])
# position angle
pos_encoding = pos / angles
# sin even number
pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
# cos odd number
pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
return pos_encoding
#return tf.cast(pos_encoding, tf.float32)
class Transformer(nn.Module):
#class Transformer(tf.keras.Model):
"""
Transformer Class
"""
def __init__(self, args, name='transformer'):
"""
생성자
:param args: Args 객체
:param name: layer name
"""
super(Transformer, self).__init__()
#super().__init__(name=name)
self.i_pad = args.i_pad
self.embedding = SharedEmbedding(args)
self.position = PositionalEmbedding(args)
self.encoder_layers = [EncoderLayer(args, name=f'encoder_layer_{i}') for i in range(args.n_layer)]
self.decoder_layers = [DecoderLayer(args, name=f'decoder_layer_{i}') for i in range(args.n_layer)]
self.dropout = nn.Dropout(args.dropout)
def forward(self, inputs, training=False):
#def call(self, inputs, training=False):
"""
layer 실행
:param inputs: enc_tokens, dec_tokens
:return logits: dec_tokens에 대한 다음 토큰 예측 결과 logits
"""
enc_tokens, dec_tokens = inputs
# encoder self attention mask
enc_self_mask = get_pad_mask(enc_tokens, self.i_pad)
# decoder self attention mask
dec_self_mask = get_causal_mask(dec_tokens, self.i_pad)
# encoder and decoder attention mask
enc_dec_mask = get_pad_mask(enc_tokens, self.i_pad)
# enc_tokens embedding lookup
enc_hidden = self.embedding(enc_tokens) + self.position(enc_tokens)
enc_hidden = self.dropout(enc_hidden)
# call encoder layers
for encoder_layer in self.encoder_layers:
enc_hidden = encoder_layer(enc_hidden, enc_self_mask, training)
# dec_tokens embedding lookup
dec_hidden = self.embedding(dec_tokens) + self.position(dec_tokens)
if training == False :
self.dropout.p = 0.0 #drop out training = False
dec_hidden = self.dropout(dec_hidden)
# call decoder layers
for decoder_layer in self.decoder_layers:
dec_hidden = decoder_layer(dec_hidden, enc_hidden, dec_self_mask, enc_dec_mask, training)
# call weight shared embedding (model=linear)
logits = self.embedding(dec_hidden, mode='linear')
# softmax
logit_softmax = nn.Softmax(dim=-1)
y_pred = logit_softmax(logits)
self.dropout.p = args.dropout
return y_pred
def lm_loss(logits, labels):
logit_softmax = nn.Softmax(dim=-1)
logits = logit_softmax(logits)
loss_fn = torch.nn.CrossEntropyLoss(reduction='none')
loss = loss_fn(logits, labels)
mask = labels.ne(0)
loss_val = loss.masked_select(mask).sum()
total = mask.sum()
loss = loss_val / torch.maximum(total, torch.tensor(1))
return loss
def lm_acc(y_pred, y_true):
"""
pad 부분을 제외하고 accuracy를 계산하는 함수
:param y_true: 정답
:param y_pred: 예측 값
:retrun loss: pad 부분이 제외된 accuracy 값
"""
y_true_clone = y_true.clone().detach()
y_pred_clone = y_pred.clone().detach()
y_pred_softmax = nn.Softmax(dim=-1)
y_pred_clone = y_pred_softmax(y_pred_clone )
y_pred_class = torch.argmax(y_pred_clone,dim=-1)
matches = torch.eq(y_true_clone, y_pred_class)
matches = matches.to(device).int()
mask = torch.ne(y_true_clone, 0)
mask = mask.to(device).int()
matches *= mask
mask_total = mask.sum()
matches_total = matches.sum()
accuracy = matches_total / torch.maximum(mask_total, torch.tensor(1))
print(y_true_clone)
print(y_pred_class)
print(accuracy)
return accuracy
model = Transformer(args)
function_predict = model((train_enc_inputs[:4], train_dec_inputs[:4]),training=True)
loss = lm_loss(function_predict[:4].view(-1, function_predict.size(-1)), train_dec_labels[:4].view(-1)).to(device)
acc = lm_acc(function_predict[:4].view(-1, function_predict.size(-1)), train_dec_labels[:4].view(-1)).to(device)
input : tensor([7116, 107, 1, ..., 0, 0, 0]) output : tensor([ 2, 7116, 107, ..., 0, 0, 0])
I make transofromer by pytorch, The output same as dec_input starting 2 ending with 3 I challenge 1000 times this code, the result is smae. I think this code has some problem, This is my first pytorch code. Can you fix it this stressed code?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
