当前位置:网站首页>[chat robot] principle of seq2seq model

[chat robot] principle of seq2seq model

2021-08-08 00:13:29 ZSYL

Introduction to chat robot

In the project preparation stage, we know , After the user said a word , Will judge its intention , If you want to chat , Then it will call the chat model and return the result , This is the function we will implement in the project .

At present, the common chat robots on the market are Microsoft Xiaobing This type of model , A long time ago Small yellow chicken This experience is worse than the model

The common chat model is a kind of seq2seq Structure , Later I will learn and use seq2seq To realize our chat robot

1. Seq2Seq Introduction to

 Insert picture description here

Sequence to sequence (seq2seq) By encoder( Encoder ) and decoder( decoder ) Two RNN Made up of . among encoder Be responsible for understanding the input sentences , Turn into context vector,decoder Be responsible for processing the vector of the understood sentence , decode , Get output . The above process is very similar to the process of our brain understanding things , Hear a word , After understanding , Try to assemble the answer , Answer

So at this time , There is a problem , stay encoder In the process of context vector As decoder The input of , So such an input , How can I get multiple outputs ?

In fact, that is The output of the current step , As input to the next unit , And then get the result

outputs = []
while True:
    output = decoderd(output)
    outputs.append(output)

So when does the cycle stop ?

In the training dataset , You can add a terminator to the last side of the output <END>, If the terminator is encountered , You can terminate the loop

outputs = []
while output!="<END>":
    output = decoderd(output)
    outputs.append(output)

This terminator is just a marker , Many people also use <EOS>(End Of Sentence)

All in all :Seq2seq In the model encoder Accept a length of M Sequence , obtain 1 individual context vector, after decoder Take this one context vector The length is N As output , So as to form a M to N Model of , It can handle many input and output problems with variable length , such as : Text translation , Question and answer , Article summary , Key words, writing poetry, etc

2. Seq2Seq Model implementation

below , We pass a simple list , Let's take a look at ordinary ones Seq2Seq How should the model be implemented .

demand : Complete a model , Input a string of numbers into the model , Output this string of numbers +0

for example

  • Input 123456789, Output 1234567890;
  • Input 52555568, Output 525555680

2.1 Implementation process

  1. Convert text to sequence ( Number sequence ,torch.LongTensor
  2. Usage sequence , Prepare the dataset , Get ready Dataloader
  3. Complete encoder
  4. Complete decoder
  5. complete seq2seq Model
  6. Complete the logic of model training , Training
  7. Complete the logic of model evaluation , The model was evaluated

2.2 Convert text to sequence

Because the input is a number , In order to match the written number with the real number in the dictionary , These numbers can be understood as strings

So what we need to do is :

  1. Correspond a string to a number
  2. Convert numbers to strings

The completion logic is the same as before , establish word_sequence.py file , Implement the above logic

class NumSequence:
    UNK_TAG = "UNK" # Unknown words 
    PAD_TAG = "PAD" # filler , Achieve text alignment , That is, a batch The sentences in are the same length , Short sentences will be padding
    EOS_TAG = "EOS" # The beginning of the sentence 
    SOS_TAG = "SOS" # The end of the sentence 

    UNK = 0
    PAD = 1
    EOS = 2
    SOS = 3

    def __init__(self):
        self.dict = {
    
            self.UNK_TAG : self.UNK,
            self.PAD_TAG : self.PAD,
            self.EOS_TAG : self.EOS,
            self.SOS_TAG : self.SOS
        }
        # Get the dictionary corresponding to string and number 
        for i in range(10):
            self.dict[str(i)] = len(self.dict)
		# Get the dictionary corresponding to numbers and strings 
        self.index2word = dict(zip(self.dict.values(),self.dict.keys()))

    def __len__(self):
        return len(self.dict)

    def transform(self,sequence,max_len=None,add_eos=False):
        """ sequence: The sentence  max_len : The maximum length of a sentence  add_eos: Whether to add Terminator  """
        
        sequence_list = list(str(sequence))
        seq_len = len(sequence_list)+1 if add_eos else len(sequence_list)

        if add_eos and max_len is not None:
            assert max_len>= seq_len, "max_len  Need greater than seq+eos The length of "
        _sequence_index = [self.dict.get(i,self.UNK) for i in sequence_list]
        if add_eos:
            _sequence_index += [self.EOS]
        if max_len is not None:
            sequence_index = [self.PAD]*max_len
            sequence_index[:seq_len] =  _sequence_index
            return sequence_index
        else:
            return _sequence_index

    def inverse_transform(self,sequence_index):
        result = []
        for i in sequence_index:
            if i==self.EOS:
                break
            result.append(self.index2word.get(int(i),self.UNK_TAG))
        return result
#  Instantiation , For subsequent calls 
num_sequence = NumSequence()

if __name__ == '__main__':
    num_sequence = NumSequence()
    print(num_sequence.dict)
    print(num_sequence.index2word)
    print(num_sequence.transform("1231230",add_eos=True))

2.3 Prepare the dataset

2.3.1 Get ready Dataset

here , We use randomly created [0,100000000] The integer of , To prepare the dataset

from torch.utils.data import Dataset,DataLoader
import numpy as np
from word_sequence import num_sequence
import torch
import config

class RandomDataset(Dataset):
    def __init__(self):
        super(RandomDataset,self).__init__()
        self.total_data_size = 500000
        np.random.seed(10)
        self.total_data = np.random.randint(1,100000000,size=[self.total_data_size])

    def __getitem__(self, idx):
        """ return input,target,input_length,target_length( Real length )"""
        input = str(self.total_data[idx])
        return input, input+ "0",len(input),len(input)+1

    def __len__(self):
        return self.total_data_size

Through the results of random numbers , You can see , Most numbers are... Long 8, Add... After the target value 0 and EOS after , Maximum length is 10

So common config The configuration file , add max_len: Maximum text length , Convenient for subsequent modification

2.3.2 Get ready DataLoader

In the preparation DataLoader In the process of , Can be defined by collate_fn To achieve the right dataset in batch Data processing

It needs to be noted that :

  1. Need to be right batch Sort data in , Sort in descending order according to the real length of the data ( You need to use )
  2. Need to call Text serialization Methods , The operation of serializing text , meanwhile target Need to carry out add eos The operation of
  3. Finally, the of the sequence is returned LongTensor Format
  4. stay DataLoader There is drop_last Parameters , When the amount of data cannot be batch_size Divisible time , the last one batch The length of the number of data is different from that of the previous data , Consider deleting
def collate_fn(batch):
    #1.  Yes batch Sort , Sort by length from long to short 
    batch = sorted(batch,key=lambda x:x[3],reverse=True)
    input,target,input_length,target_length = zip(*batch)

    #2. Conduct padding The operation of 
    input = torch.LongTensor([num_sequence.transform(i,max_len=config.max_len) for i in input])
    target = torch.LongTensor([num_sequence.transform(i,max_len=config.max_len,add_eos=True) for i in target])
    input_length = torch.LongTensor(input_length)
    target_length = torch.LongTensor(target_length)

    return input,target,input_length,target_length

data_loader = DataLoader(dataset=RandomDataset(),batch_size=config.batch_size,collate_fn=collate_fn,drop_last=True)

2.4 Prepare encoder

Encoder (encoder) The purpose of is to encode the text , Give the encoded results to subsequent programs for use , So here we can use Embedding+GRU Structure to use , Use the last time step Output (hidden state) As Coding results of sentences

 Insert picture description here

Be careful :

  1. Embedding and GRU Parameters of , Here we let GRU in batch On the front
  2. The shape of the output result
  3. stay LSTM and GRU in , Every time step The input will be calculated , Get the results , The whole process is a cycle related to the length of the sentence , Manual implementation is slow
    1. pytorch Implemented in the nn.utils.rnn.pack_padded_sequence Yes padding After the sentence is packaged, you can get LSTM or GRU Result
    2. At the same time nn.utils.rnn.pad_packed_sequence Unpack the packaged content
  4. nn.utils.rnn.pack_padded_sequence In the process of use, it is necessary to batch According to the length of the sentence null

The implementation code is as follows :

import torch.nn as nn
from word_sequence import num_sequence
import config


class NumEncoder(nn.Module):
    def __init__(self):
        super(NumEncoder,self).__init__()
        self.vocab_size = len(num_sequence)
        self.dropout = config.dropout
        self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=config.embedding_dim,padding_idx=num_sequence.PAD)
        self.gru = nn.GRU(input_size=config.embedding_dim,
                          hidden_size=config.hidden_size,
                          num_layers=1,
                          batch_first=True)

    def forward(self, input,input_length):
        """ input:[batch_size,max_len] input_length:[batch_size] """
        embeded = self.embedding(input) #[batch_size,max_len , embedding_dim]
        
        # Pack the sentences after text alignment , Can accelerate in LSTM or GRU Calculation process in 
        embeded = nn.utils.rnn.pack_padded_sequence(embeded,lengths=input_length,batch_first=True)

        #hidden:[1,batch_size,vocab_size]
        out,hidden = self.gru(embeded)
        
        # Unpack the previously packaged results 
        out,outputs_length = nn.utils.rnn.pad_packed_sequence(out,batch_first=True,padding_value=num_sequence.PAD)
        # out [batch_size,seq_len,hidden_size]
        return out,hidden

2.5 Implementation decoder

The coder is mainly responsible for processing the encoded results , Get the predicted value , Prepare for subsequent loss calculation

It's time to think :

  1. What kind of loss function to use , What format does the predicted value need to be

    • Combined with previous experience , We can understand that the current problem is a classification problem , That is, the output of each time is actually to select a word with the greatest probability
    • The shape of the real value is [batch_size,max_len], So we know that the output result needs to be a [batch_size,max_len,vocab_size] The shape of the
    • This is the last dimension of the predicted value log_softmax, Then multiply by the real value , So as to get the loss
  2. How to encode the result [1,batch_size,hidden_size] To operate , Get the predicted value . The decoder is also a RNN, That is, you can also use LSTM or GRU Structure , So in the decoder :

    • Through the loop , One at a time time step The content of

    • The encoder results as the initial hidden layer state , Define a [batch_size,1] It's all about SOS As the initial input , Tell the decoder , It's time to start working

    • An output is predicted by the decoder [batch_size,hidden_size]( The shape will be adjusted to [batch_size,vocab_size]), Take this output as the input and then use the decoder to decode

    • The above is a cycle , The number of cycles is the maximum length of the sentence , Then you can get max_len Outputs

    • Compare all the output results concate, obtain [batch_size,max_len,vocab_size]

  3. stay RNN During the training , Use the result of the previous prediction as the next step The input of , May lead to One step wrong , The result of wrong steps , If we improve the convergence speed of the model ?

    • You can consider in the process of training , Take the real value as the input for the next step , This can be avoided Step by step wrong situation
    • At the same time, in the process of using real values , Still use the predicted value as input for the next step , Two inputs are used randomly
    • The above mechanism we call Teacher forcing, Like a tutor , At every step, we will correct our behavior , So that we can need the rules after many training

 Insert picture description here

import torch
import torch.nn as nn
import config
import random
import torch.nn.functional as F
from word_sequence import num_sequence

class NumDecoder(nn.Module):
    def __init__(self):
        super(NumDecoder,self).__init__()
        self.max_seq_len = config.max_len
        self.vocab_size = len(num_sequence)
        self.embedding_dim = config.embedding_dim
        self.dropout = config.dropout

        self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=self.embedding_dim,padding_idx=num_sequence.PAD)
        self.gru = nn.GRU(input_size=self.embedding_dim,
                          hidden_size=config.hidden_size,
                          num_layers=1,
                          batch_first=True,
                          dropout=self.dropout)
        self.log_softmax = nn.LogSoftmax()

        self.fc = nn.Linear(config.hidden_size,self.vocab_size)

    def forward(self, encoder_hidden,target,target_length):
        # encoder_hidden [batch_size,hidden_size]
        # target [batch_size,max_len]
		
        # The initial is all SOS The input of 
        decoder_input = torch.LongTensor([[num_sequence.SOS]]*config.batch_size)

        # The output of the decoder , Used to save all output results after 
        decoder_outputs = torch.zeros(config.batch_size,config.max_len,self.vocab_size) 
		
        decoder_hidden = encoder_hidden #[batch_size,hidden_size]

        for t in range(config.max_len):
            decoder_output_t , decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
            
            # In different time step Copy it on ,decoder_output_t [batch_size,vocab_size]
            decoder_outputs[:,t,:] = decoder_output_t
			
            # During the training , Use  teacher forcing, Rectify the deviation 
            use_teacher_forcing = random.random() > 0.5
            if use_teacher_forcing:
                # The next input uses the real value 
                decoder_input =target[:,t].unsqueeze(1)  #[batch_size,1]
            else:
                # Use predicted values ,topk in k=1, That is, get the largest value of the last dimension 
                value, index = torch.topk(decoder_output_t, 1) # index [batch_size,1]
                decoder_input = index
        return decoder_outputs,decoder_hidden

    def forward_step(self,decoder_input,decoder_hidden):
        """ :param decoder_input:[batch_size,1] :param decoder_hidden: [1,batch_size,hidden_size] :return: out:[batch_size,vocab_size],decoder_hidden:[1,batch_size,didden_size] """
        embeded = self.embedding(decoder_input)  #embeded: [batch_size,1 , embedding_dim]

        out,decoder_hidden = self.gru(embeded,decoder_hidden) #out [1, batch_size, hidden_size]

       	out = out.squeeze(0) # Removal of 0 Dimensional 1
        # Make full connection shape change , At the same time log_softmax
        out = F.log_softmax(self.fc(out),dim=-1)#out [batch_Size,1, vocab_size]
        out = out.squeeze(1)
        return out,decoder_hidden

2.6 complete seq2seq Model

Before calling encoder and decoder, Complete the construction of the model

import torch
import torch.nn as nn

class Seq2Seq(nn.Module):
    def __init__(self,encoder,decoder):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input,target,input_length,target_length):
        # Encoding 
        encoder_outputs,encoder_hidden = self.encoder(input,input_length)
        # decode 
        decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target,target_length)
        return decoder_outputs,decoder_hidden

2.7 Complete the training logic

The thinking process is the same as before

import torch
import config
from torch import optim
import torch.nn as nn
from encoder import NumEncoder
from decoder import NumDecoder
from seq2seq import Seq2Seq
from dataset import data_loader as train_dataloader
from word_sequence import num_sequence



encoder = NumEncoder()
decoder = NumDecoder()
model = Seq2Seq(encoder,decoder)
print(model)

# Custom initialization parameters 
#for name, param in model.named_parameters():
# if 'bias' in name:
# torch.nn.init.constant_(param, 0.0)
# elif 'weight' in name:
# torch.nn.init.xavier_normal_(param)

# model.load_state_dict(torch.load("model/seq2seq_model.pkl"))
optimizer =  optim.Adam(model.parameters())
# optimizer.load_state_dict(torch.load("model/seq2seq_optimizer.pkl"))
criterion= nn.NLLLoss(ignore_index=num_sequence.PAD,reduction="mean")

def get_loss(decoder_outputs,target):
    # Many times if tensor Transpose and other operations are carried out , Call directly view Shape modification is not successful 
    #target = target.contiguous().view(-1) #[batch_size*max_len]
    target = target.view(-1)
    decoder_outputs = decoder_outputs.view(config.batch_size*config.max_len,-1)
    return criterion(decoder_outputs,target)


def train(epoch):
    for idx,(input,target,input_length,target_len) in enumerate(train_dataloader):
        optimizer.zero_grad()
        ##[seq_len,batch_size,vocab_size] [batch_size,seq_len]
        decoder_outputs,decoder_hidden = model(input,target,input_length,target_len)
        loss = get_loss(decoder_outputs,target)
        loss.backward()
        optimizer.step()

        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, idx * len(input), len(train_dataloader.dataset),
                   100. * idx / len(train_dataloader), loss.item()))

        torch.save(model.state_dict(), "model/seq2seq_model.pkl")
        torch.save(optimizer.state_dict(), 'model/seq2seq_optimizer.pkl')

if __name__ == '__main__':
    for i in range(10):
        train(i)

2.8 Complete the model evaluation logic

Complete the evaluation logic , and decoder The training process is slightly different , You can create a new evaluation Methods , Pass in encoder_hidden, Get the predicted results

    def evaluation(self,encoder_hidden): #[1, 20, 14]
        batch_size = encoder_hidden.size(1) # When evaluating and training batch_size Different , Do not apply config Configuration of 

        decoder_input = torch.LongTensor([[num_sequence.SOS] * batch_size])
        decoder_outputs = torch.zeros(batch_size,config.max_len, self.vocab_size)  # [batch_size,seq_len,vocab_size]
        decoder_hidden = encoder_hidden
		
        # assessment , No longer use teacher forcing, Fully use the predicted value as the next input 
        for t in range(config.max_len):
            decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
            decoder_outputs[:,t,:] = decoder_output_t
            value, index = torch.topk(decoder_output_t, 1)  # index [20,1]
            decoder_input = index.transpose(0, 1)

        # Get output id
        decoder_indices = []  #[[1,2,4],[23,3,2]]
        for i in range(config.max_len):
            value,index = torch.topk(decoder_outputs[:,i,:],k=1,dim=-1)
            decoder_indices.append(index.view(-1).numpy())
        #transpose  Adjust to output by sentence 
        decoder_indices = np.array(decoder_indices).transpose() 
        return decoder_indices

After that seq2seq Of model in , add to evaluation The logic of

import torch
import torch.nn as nn

class Seq2Seq(nn.Module):
    def __init__(self,encoder,decoder):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input,target,input_length,target_length):
        encoder_outputs,encoder_hidden = self.encoder(input,input_length)
        decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target,target_length)
        return decoder_outputs,decoder_hidden

    def evaluation(self,inputs,input_length):
        encoder_outputs,encoder_hidden = self.encoder(inputs,input_length)
        decoded_sentence = self.decoder.evaluation(encoder_hidden)
        return decoded_sentence

establish eval.py, Complete the logic of model evaluation

import torch
import config
from torch import optim
import torch.nn as nn
from encoder import NumEncoder
from decoder import NumDecoder
from seq2seq import Seq2Seq
from dataset import data_loader as train_dataloader
from word_sequence import num_sequence
import numpy as np
import random


encoder = NumEncoder()
decoder = NumDecoder()
model = Seq2Seq(encoder,decoder)
model.load_state_dict(torch.load("model/seq2seq_model.pkl"))

def evalaute():
    data = [str(i) for i in np.random.randint(0, 100000000, [10])]
    data = sorted(data,key=lambda x:len(x),reverse=True)
    print(data)

    _data_length = torch.LongTensor([len(i) for i in data])
    _data = torch.LongTensor([num_sequence.transform(i,max_len=config.max_len) for i in data])
    output = seq2seq.evaluate(_data,_data_length)
    print([num_sequence.inverse_transform(i) for i in output])

if __name__ == '__main__':
    evalaute()

stay model Train one epoch after ,loss It's very low , The evaluation output is as follows ( by True It means that the prediction is correct ):

39304187 >>>>> 393041870 True
41020882 >>>>> 410208820 True
85784317 >>>>> 857843170 True
1394232 >>>>> 13942320 True
44548446 >>>>> 445484460 True
49457730 >>>>> 494577300 True
82451872 >>>>> 824518720 True
64380958 >>>>> 643809580 True
97501723 >>>>> 975017230 True
21656800 >>>>> 216568000 True

Full code reference :
https://github.com/SpringMagnolia/PytorchTutorial/tree/master/seq2seq

3. summary

seq2seq technological process 
1. encoder
	a.  Yes input  Conduct embedding
	b.  Yes embedding Package the results  pack_padded_sequence
	c.  Pass in gru Calculate , obtain output and hidden
	d.  Yes output Unpack  pad_packed_sequence
2. decoder
	a.  Construction initializer , structure [batch_size, 1] Of SOS, As input to the first time step 
	b.  Input the first time step embedding, obtain embeded
	c.  Yes embedded  Conduct gru Calculation , obtain output  and  hidden,hidden  As the next time step hidden,
	d.  Calculate the output value of the first time step : The output of the first time step is deformed , Calculated after log_softmax, obtain output, And get the position with the largest value dim = -1, As the output of the first time step  
	out = out.squeeze(1) # [batch_size, 1, hidden_size] --> [batch_size, hidden_size]
    output = F.log_softmax(self.fc(out), dim = -1)  # fc after ,out [batch_size, hidden_size]-->[batch_size, vocab_size]
    e.  preservation output
    #  preservation decoder_output_t To decoder_outputs in 
    decoder_outputs[:, t, :] = decoder_output_t
	f.  The second time step , Input yes :hidden And the specific value output in the first time step ( It's an index ), Use teacher_forcing Mechanism , Speed up training 
	#  Use predicted values ,topk in k = 1, That is, get the largest value of the last dimension 
    value, index = torch.topk(decoder_output_t, k = 1)
    	#  Use teacher_forcing Mechanism , Speed up training 
       if random.random() > config.teacher.focing: 
           decoder_input = target[t] # [batch_size, 1]
       else:
           #  Get the next input
           decoder_input = index
    g.  repeat b-f Step , repeat max_lenth Time , namely target length (config.max_len+2 dataset In order to get target when ,max_len+1 And add_eos=True, therefore +2)
    h.  obtain decoder_outputs
3. train
	a. output  and  target  Calculation nll_loss( Loss with rights ), If it's third order , You need to deform 
	decoder_outputs = decoder_outputs.view(decoder_outputs.size(0) * decoder_outputs.size(1), -1) # [batch_size * seq_len, -1]
    target = target.view(-1) # [batch_size * seq_len]
    loss = F.nll_loss(decoder_outputs, target, ignore_index = config.num_sequence.PAD)
4. eval
	a.  and decoder Almost the same , But you don't need to save output,  It only needs batch Data output at each time step 
	b.  The output of each time step is placed in the list , Each column is the final result of the input 
	indices = seq2seq.evaluate(input, input_length)
	indices = np.array(indices).transpose()

版权声明
本文为[ZSYL]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/08/20210808001155871N.html

随机推荐