'When I make Transformer for NLP, it doesn't work, how can I fix it?

def get_pad_mask(tokens, i_pad=0):
      """
      pad mask 계산하는 함수
      :param tokens: tokens (bs, n_seq)
      :param i_pad: id of pad
      :return mask: pad mask (pad: 1, other: 0)
      """
      # pad: True, others: False
      mask = torch.eq(tokens, i_pad)
      # boolean -> float 32
      mask = mask.type(torch.FloatTensor)
      # expand dimension for Q n_seq
      mask = torch.unsqueeze(mask, 1)
      return mask
def get_causal_mask(tokens, i_pad=0):
    """
    causal mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: causal and pad mask (causal or pad: 1, other: 0)
    """
    # n_seq 조회
    n_seq = tokens.shape[1]
    # all one mask
    mask = torch.ones((n_seq, n_seq))
    # make reverse causal mask
    mask = mask.triu(1)
    # 0 -> 1, 1 -> 0
    # expand dim for bs
    mask = torch.unsqueeze(mask, 0)
    # get pad_mask
    pad_mask = get_pad_mask(tokens, i_pad)
    # mask all causal_mask or pad_mask
    mask = torch.maximum(mask, pad_mask)
    return mask

class ScaleDotProductAttention(nn.Module):
    """
    Scale Dot Product Attention Class
    """
    def __init__(self, name="scale_dot_product_attention"):
        """
        생성자
        :param name: layer name
        """
        super(ScaleDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        """
        layer 실행
        :param Q: Query
        :param K: Key
        :param V: Value
        :param attn_mask: attention mask
        :return attn_out: attention 실행 결과
        """
        # matmul Q, K.T
        attn_score = torch.matmul(Q, K.transpose(-2,-1))
        # d_k
        d_k = torch.tensor(K.shape[-1])
        # scale = d_k ** 0.5
        scale = torch.sqrt(d_k)
        # divide by scale
        attn_scale = torch.divide(attn_score, scale)
        # do mask (subtract 1e-9 for masked value)
        attn_scale -= 1.e9 * attn_mask
        # calculate attention prob
        attn_prob = torch.softmax(attn_scale, axis=-1)
        # weighted sum of V
        attn_out = torch.matmul(attn_prob, V)
        return attn_out

class MultiHeadAttention(nn.Module):  
#class MultiHeadAttention(tf.keras.layers.Layer):
    """
    Multi Head Attention Class
    """
    def __init__(self, args, name="MultiHeadAttention"):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(MultiHeadAttention, self).__init__()

        self.d_model = args.d_model
        self.n_head = args.n_head
        self.d_head = args.d_head

        # Q, K, V input dense layer
        self.W_Q = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
        self.W_K = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
        self.W_V = nn.Linear(args.n_head * args.d_head,args.n_head * args.d_head)
        '''TensorFLow
        self.W_Q = tf.keras.layers.Dense(self.n_head * self.d_head)
        self.W_K = tf.keras.layers.Dense(self.n_head * self.d_head)
        self.W_V = tf.keras.layers.Dense(self.n_head * self.d_head)
        TensorFLow'''
        # Scale Dot Product Attention class
        self.attention = ScaleDotProductAttention(name="self_attention")
        # output dense layer
        #self.W_O = torch.nn.Linear(args.n_head * args.d_head,self.d_model)
        self.W_O = torch.nn.Linear(self.d_model,self.d_model)
        '''TensorFLow
        self.W_O = tf.keras.layers.Dense(self.d_model)
        TensorFLow'''

    def forward(self, Q, K, V, attn_mask):
        """
        layer 실행
        :param Q: Query
        :param K: Key
        :param V: Value
        :param attn_mask: attention mask
        :return attn_out: attention 실행 결과
        """
        # build multihead Q, K, V
        self.Q_m = torch.transpose(torch.reshape(self.W_Q(Q), [-1, Q.shape[1], args.n_head, args.d_head]), 2, 1)  # (bs, n_head, Q_len, d_head)
        self.K_m = torch.transpose(torch.reshape(self.W_K(K), [-1, K.shape[1], args.n_head, args.d_head]), 2, 1)  # (bs, n_head, Q_len, d_head)
        self.V_m = torch.transpose(torch.reshape(self.W_V(V), [-1, V.shape[1], args.n_head, args.d_head]), 2, 1)  # (bs, n_head, Q_len, d_head)
        '''TensorFLow
        Q_m = tf.transpose(tf.reshape(self.W_Q(Q), [-1, tf.shape(Q)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        K_m = tf.transpose(tf.reshape(self.W_K(K), [-1, tf.shape(K)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        V_m = tf.transpose(tf.reshape(self.W_V(V), [-1, tf.shape(V)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        TensorFLow'''
        # build multihead mask
        attn_mask_m = torch.unsqueeze(attn_mask, axis=1)
        '''TensorFLow
        attn_mask_m = tf.expand_dims(attn_mask, axis=1)
        TensorFLow'''
        # Scale Dot Product Attention with multi head Q, K, V, attn_mask
        attn_out_m = self.attention(self.Q_m, self.K_m, self.V_m, attn_mask_m)  # (bs, n_head, Q_len, d_head)
        # transpose
        attn_out_t = torch.transpose(attn_out_m,2, 1)
        '''TensorFLow
        attn_out_t = tf.transpose(attn_out_m, perm=[0, 2, 1, 3])  # (bs, n_head, Q_len, d_head) -> (bs, Q_len, n_head, d_head)
        TensorFLow'''
        # reshape
        attn_out_c = torch.reshape(attn_out_t, [-1, Q.shape[1], args.n_head * args.d_head])  # (bs, Q_len, n_head * d_head) -> (bs, Q_len, n_head,  d_head)
        '''TensorFLow
        attn_out_c = tf.reshape(attn_out_t, [-1, tf.shape(Q)[1], args.n_head * args.d_head])  # (bs, Q_len, n_head, d_head) -> (bs, Q_len, n_head * d_head)
        TensorFLow'''
        # linear for output
        attn_out = self.W_O(attn_out_c) # (bs, Q_len, n_head * d_head) -> (bs, Q_len, d_model)
        return attn_out

class PositionWiseFeedForward(nn.Module):
#class PositionWiseFeedForward(tf.keras.layers.Layer):
    """
    Position Wise Feed Forward Class
    """
    def __init__(self, args, name="PositionWiseFeedForward"):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(PositionWiseFeedForward, self).__init__()
        #super().__init__(name=name)

        relu_f = torch.nn.ReLU()
        relu_W = nn.Linear(args.d_model,args.d_ff)
        self.W_1 = torch.nn.Sequential(relu_W,relu_f)
        self.W_2 = nn.Linear(args.d_ff,args.d_model)

        #self.W_1 = tf.keras.layers.Dense(args.d_ff, activation=tf.nn.relu)
        #self.W_2 = tf.keras.layers.Dense(args.d_model)

    def forward(self,inputs):
    #def call(self, inputs):
        """
        layer 실행
        :param inputs: inputs
        :return ff_val: feed forward 실행 결과
        """
        # linear W_1 and W_2
        ff_val = self.W_1(inputs)
        ff_val = self.W_2(ff_val)
        return ff_val

class EncoderLayer(nn.Module):
#class EncoderLayer(tf.keras.layers.Layer):
    """
    Encoder Layer Class
    """
    def __init__(self, args, name='encoder_layer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(EncoderLayer, self).__init__()
        #super().__init__(name=name)

        self.enc_x_size = hidden_enc.size(dim=1)
        self.enc_y_size = hidden_enc.size(dim=2)
        self.self_attention = MultiHeadAttention(args)
        self.norm1 = nn.LayerNorm([self.enc_x_size,self.enc_y_size],eps=args.norm_eps)

        self.ffn = PositionWiseFeedForward(args)
        self.norm2 = nn.LayerNorm([self.enc_x_size,self.enc_y_size],eps=args.norm_eps)

        self.dropout = nn.Dropout(args.dropout)
 
    def forward(self, enc_hidden, self_mask, training):
        """
        layer 실행
        :param enc_hidden: 이전 layer 출력
        :param self_mask: self attention mask
        :param training: training flag
        :return enc_out: EncoderLayer 실행 결과
        """
        # self attention
        if training == False : 
          self.dropout.p = 0.0 #drop out training = False

        print('ENCODER')
        print(self.enc_x_size,self.enc_y_size,self.dropout.p)

        self_attn_val = self.self_attention(enc_hidden, enc_hidden, enc_hidden, self_mask)
        # add and layer normal
        norm1_val = self.norm1(enc_hidden + self.dropout(self_attn_val))
        # feed forward
        ffn_val = self.ffn(norm1_val)
        # add and layer normal
        enc_out = self.norm2(norm1_val + self.dropout(ffn_val))

        self.dropout.p = args.dropout
        return enc_out

class DecoderLayer(nn.Module):
#class DecoderLayer(tf.keras.layers.Layer):
    """
    Decoder Layer Class
    """
    def __init__(self, args, name='decoder_layer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(DecoderLayer, self).__init__()
        #super().__init__(name=name)

        self.dec_x_size = dec_hidden.size(dim=1)
        self.dec_y_size = dec_hidden.size(dim=2)

        self.self_attention = MultiHeadAttention(args)
        self.norm1 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)

        self.ende_attn = MultiHeadAttention(args)
        self.norm2 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)

        self.ffn = PositionWiseFeedForward(args)
        self.norm3 = nn.LayerNorm([self.dec_x_size, self.dec_y_size],eps=args.norm_eps)

        self.dropout = nn.Dropout(args.dropout)

    def forward(self, dec_hidden, enc_out, self_mask, ende_mask,training):
    #def call(self, dec_hidden, enc_out, self_mask, ende_mask, training):
        """
        layer 실행
        :param dec_hidden: 이전 layer 출력
        :param enc_out: Encoder final 출력
        :param self_mask: self attention mask
        :param ende_mask: Encoder Decoder attention mask
        :param training: training flag
        :return dec_out: DecoderLayer 실행 결과
        """
        print('DecoderLayer')
        print(self.dec_x_size,self.dec_y_size)
        if training == False : 
          self.dropout.p = 0.0 #drop out training = False
        # self attention
        self_attn_val = self.self_attention(dec_hidden, dec_hidden, dec_hidden, self_mask)
        # add and layer normal
        norm1_val = self.norm1(dec_hidden + self.dropout(self_attn_val))

        # encoder and decoder attention
        ende_attn_val = self.ende_attn(norm1_val, enc_out, enc_out, ende_mask)
        # add and layer normal
        norm2_val = self.norm2(norm1_val + self.dropout(ende_attn_val))

        # feed forward
        ffn_val = self.ffn(norm2_val)
        # add and layer normal
        dec_out = self.norm3(norm2_val + self.dropout(ffn_val))

        self.dropout.p = args.dropout
        return dec_out 

class SharedEmbedding(nn.Module):
#class SharedEmbedding(tf.keras.layers.Layer):
    """
    Weighed Shaed Embedding Class
    """
    def __init__(self, args, name='SharedEmbedding'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(SharedEmbedding, self).__init__()
        #super().__init__(name=name)

        self.shared_weights = torch.empty(args.n_vocab, args.d_model)
       
        self.shared_weights = torch.nn.init.trunc_normal_(self.shared_weights,std = args.d_model ** -0.5)
        #with tf.name_scope('shared_embedding_weight'):

        self.n_vocab = args.n_vocab
        self.d_model = args.d_model
   

    def forward(self, inputs, mode='embedding'):
    #def call(self, inputs, mode='embedding'):
        """
        layer 실행
        :param inputs: 입력
        :param mode: 실행 모드
        :return: embedding or linear 실행 결과
        """
        # mode가 embedding일 경우 embedding lookup 실행
        if mode == 'embedding':
            return self._embedding(inputs)
        # mode가 linear일 경우 linear 실행
        elif mode == 'linear':
            return self._linear(inputs)
        # mode가 기타일 경우 오류 발생
        else:
            raise ValueError(f'mode {mode} is not valid.')
   
    def _embedding(self, inputs):
        """
        embedding lookup
        :param inputs: 입력
        """
        # lookup by gather
        embed = torch.matmul(nn.functional.one_hot(inputs, len(vocab)).type(torch.float), self.shared_weights )
        #embed = tf.gather(self.shared_weights, tf.cast(inputs, tf.int32))
        # muliply d_model ** 0.5
        embed *= self.d_model ** 0.5
        return embed

    def _linear(self, inputs):  # (bs, n_seq, d_model)
        """
        linear 실행
        :param inputs: 입력
        """
        # matmul inputs, shared_weights (transpose_b=True)
        outputs = torch.matmul(inputs, torch.transpose(self.shared_weights, 1, 0))
        #outputs = tf.matmul(inputs, self.shared_weights, transpose_b=True)
        return outputs

class PositionalEmbedding(nn.Module):
#class PositionalEmbedding(tf.keras.layers.Layer):
    """
    Positional Embedding Class
    """
    def __init__(self, args, name='position_embedding'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(PositionalEmbedding, self).__init__()
        #super().__init__(name=name)
        
        pos_encoding = PositionalEmbedding.get_sinusoid_encoding(args.n_seq, args.d_model)
        self.embedding = nn.Embedding(args.n_seq, args.d_model)
        self.embedding.weights = [pos_encoding] #weights=[pos_encoding]
        self.embedding.weight.requires_grad=False #trainable=False
        #self.embedding = tf.keras.layers.Embedding(args.n_seq, args.d_model, trainable=False, weights=[pos_encoding])

    def forward(self, inputs):
    #def call(self, inputs):
        """
        layer 실행
        :param inputs: 입력
        :return embed: positional embedding lookup 결과
        """
        # make position (0...n_seq)
        zero_inputs = torch.ones_like(inputs)
        x_size = zero_inputs.size(dim=0)
        for i in range(0,x_size): 
          zero_inputs[i][0]  = 0
        position = torch.cumsum(zero_inputs, dim=1)
        #position = tf.math.cumsum(tf.ones_like(inputs), axis=1, exclusive=True)
        # embedding lookup
        embed = self.embedding(position)
        return embed

    @staticmethod
    def get_sinusoid_encoding(n_seq, d_model):
        """
        sinusoid encoding 생성
        :param n_seq: sequence number
        :param n_seq: model hidden dimension
        :return: positional encoding table
        """
        # calculate exp
        exs = np.array([2 * (i_ang // 2) / d_model for i_ang in range(d_model)])
        #    exs = np.array([2 * (i_ang // 2) / args.d_model for i_ang in range(args.d_model)])
        # calculate power
        angles = np.power(10000, exs)
        # make position
        pos = np.array([[i] for i in range(n_seq)])
        # position angle
        pos_encoding = pos / angles
        # sin even number
        pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
        # cos odd number
        pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])

        return pos_encoding
        #return tf.cast(pos_encoding, tf.float32)

class Transformer(nn.Module):
#class Transformer(tf.keras.Model):
    """
    Transformer Class
    """
    def __init__(self, args, name='transformer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super(Transformer, self).__init__()
        #super().__init__(name=name)

        self.i_pad = args.i_pad
        self.embedding = SharedEmbedding(args)
        self.position = PositionalEmbedding(args)
        
        self.encoder_layers = [EncoderLayer(args, name=f'encoder_layer_{i}') for i in range(args.n_layer)]
        self.decoder_layers = [DecoderLayer(args, name=f'decoder_layer_{i}') for i in range(args.n_layer)]

        self.dropout = nn.Dropout(args.dropout)

    def forward(self, inputs, training=False):
    #def call(self, inputs, training=False):
        """
        layer 실행
        :param inputs: enc_tokens, dec_tokens
        :return logits: dec_tokens에 대한 다음 토큰 예측 결과 logits
        """
        enc_tokens, dec_tokens = inputs
        
        # encoder self attention mask
        enc_self_mask = get_pad_mask(enc_tokens, self.i_pad)
        # decoder self attention mask
        dec_self_mask = get_causal_mask(dec_tokens, self.i_pad)
        # encoder and decoder attention mask
        enc_dec_mask = get_pad_mask(enc_tokens, self.i_pad)

        # enc_tokens embedding lookup
        enc_hidden = self.embedding(enc_tokens) + self.position(enc_tokens)
        enc_hidden = self.dropout(enc_hidden)
        # call encoder layers
        for encoder_layer in self.encoder_layers:
            enc_hidden = encoder_layer(enc_hidden, enc_self_mask, training)
        
        # dec_tokens embedding lookup
        dec_hidden = self.embedding(dec_tokens) + self.position(dec_tokens)
        if training == False : 
          self.dropout.p = 0.0 #drop out training = False

        dec_hidden = self.dropout(dec_hidden)
        # call decoder layers
        for decoder_layer in self.decoder_layers:
            dec_hidden = decoder_layer(dec_hidden, enc_hidden, dec_self_mask, enc_dec_mask, training)

        # call weight shared embedding (model=linear)
        logits = self.embedding(dec_hidden, mode='linear')
        # softmax
        logit_softmax = nn.Softmax(dim=-1)

        y_pred = logit_softmax(logits)
        self.dropout.p = args.dropout
        return y_pred

def lm_loss(logits, labels):
    logit_softmax = nn.Softmax(dim=-1)
    logits = logit_softmax(logits)
    loss_fn = torch.nn.CrossEntropyLoss(reduction='none')

    loss = loss_fn(logits, labels)
    mask = labels.ne(0)
    loss_val = loss.masked_select(mask).sum()
    total = mask.sum()
    loss = loss_val / torch.maximum(total, torch.tensor(1))
    return loss

def lm_acc(y_pred, y_true):
    """
    pad 부분을 제외하고 accuracy를 계산하는 함수
    :param y_true: 정답
    :param y_pred: 예측 값
    :retrun loss: pad 부분이 제외된 accuracy 값
    """
    y_true_clone = y_true.clone().detach()
    y_pred_clone = y_pred.clone().detach()
    y_pred_softmax = nn.Softmax(dim=-1)
    y_pred_clone  = y_pred_softmax(y_pred_clone )
    y_pred_class = torch.argmax(y_pred_clone,dim=-1)
    matches = torch.eq(y_true_clone, y_pred_class)
    matches = matches.to(device).int()
    mask = torch.ne(y_true_clone, 0)
    mask = mask.to(device).int()
    matches *= mask
    mask_total = mask.sum()
    matches_total = matches.sum()
    accuracy = matches_total / torch.maximum(mask_total, torch.tensor(1))
    print(y_true_clone)
    print(y_pred_class)
    print(accuracy)
    return accuracy

model = Transformer(args)

function_predict = model((train_enc_inputs[:4], train_dec_inputs[:4]),training=True)
loss = lm_loss(function_predict[:4].view(-1, function_predict.size(-1)), train_dec_labels[:4].view(-1)).to(device)
acc  = lm_acc(function_predict[:4].view(-1, function_predict.size(-1)), train_dec_labels[:4].view(-1)).to(device)

input : tensor([7116, 107, 1, ..., 0, 0, 0]) output : tensor([ 2, 7116, 107, ..., 0, 0, 0])

I make transofromer by pytorch, The output same as dec_input starting 2 ending with 3 I challenge 1000 times this code, the result is smae. I think this code has some problem, This is my first pytorch code. Can you fix it this stressed code?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source