14.3 模型定义

这一节我们来具体定义翻译模型的架构。具体代码在translator.py里。 网络结构示意图如下:

1342.png

Encoder采用一个3层的双向LSTM,它输出每个英文token的最后一层隐状态。同时输出每层的双向隐状态和细胞状态。 Decoder采用一个3层的单向LSTM,它每层接收Encoder最后一个时间步的隐状态和细胞状态,因为Encoder状态为双向,此处需要每层一个的线性层转化维度。 Attention计算,Decoder每个token输入隐状态和Encoder所有英文token最后一层隐状态计算注意力,得到注意力上下文向量。 Decoder每个时间步将输入的中文token Embedding和当前时间步得到的注意力上下文向量拼接作为Decoder输入。

14.3.1 Encoder

class Encoder(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, n_layers=3):
        super().__init__()
        # 默认3个循环层
        self.n_layers=n_layers
        # 定义Embedding,可以将BPE分词输出的token id,转化为emd_dim的embedding向量。
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=PAD_ID)
        # 定义双向LSTM模型
        self.bi_lstm = nn.LSTM(emb_dim, hid_dim, n_layers, bidirectional=True)
        # 定义线性层列表来降低维度。因为每个Encoder是双向的,隐状态和细胞状态为hid_dim*2,Decoder是单向的,隐状态和细胞状态维度为hid_dim。
        self.fc_hidden = nn.ModuleList([nn.Linear(hid_dim * 2, hid_dim) for _ in range(n_layers)])
        self.fc_cell = nn.ModuleList([nn.Linear(hid_dim * 2, hid_dim) for _ in range(n_layers)])

    def forward(self, src, src_len):
        embedded = self.embedding(src)
        #将一个 padded sequence(已经填充到统一长度的 batch 序列) 转换为一个特殊的 PackedSequence 对象
        #这个对象在传入 RNN 时能跳过 padding 部分的计算。
        packed = nn.utils.rnn.pack_padded_sequence(embedded, src_len, enforce_sorted=False)
        #outputs,形状为 (seq_len, batch_size, hid_dim*2),表示每个时间步、最后一层LSTM的双向隐状态拼接。
        #(hidden, cell) ,形状都为(num_layers * 2, batch_size, hid_dim)表示每一层、每个方向在最后一个时间步的隐状态或细胞状态。
        outputs, (hidden, cell) = self.bi_lstm(packed)
        #将 PackedSequence 类型的输出还原成带 padding 的标准 Tensor,方便后续处理。
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs)  # [src_len, batch, hid_dim*2]

        # 重塑隐藏状态和细胞状态: [n_layers * 2, batch, hid_dim] -> [n_layers, 2, batch, hid_dim]
        hidden = hidden.view(self.n_layers, 2, -1, hidden.size(2))
        cell = cell.view(self.n_layers, 2, -1, cell.size(2))

        # 为每一层处理前向和后向状态
        final_hidden = []
        final_cell = []

        for layer in range(self.n_layers):
            # 对每一层将正向和反向的隐状态,细胞状态合并,通过一个线性层将维度从hid_dim*2降低为hid_dim维度。
            h_cat = torch.cat((hidden[layer][-2], hidden[layer][-1]), dim=1)
            c_cat = torch.cat((cell[layer][-2], cell[layer][-1]), dim=1)
            h_layer = torch.tanh(self.fc_hidden[layer](h_cat)).unsqueeze(0)
            c_layer = torch.tanh(self.fc_cell[layer](c_cat)).unsqueeze(0)

            final_hidden.append(h_layer)
            final_cell.append(c_layer)
        # 调整好维度为hid_dim的隐状态和细胞状态,可以传递给Decoder。
        hidden_concat = torch.cat(final_hidden, dim=0)
        cell_concat = torch.cat(final_cell, dim=0)
        return outputs, hidden_concat, cell_concat

Encoder 设计如下:

  • 定义Embedding模块。BPE分词后会给每个token分配一个ID。Embedding模块给每个ID分配一个可学习的Embedding向量,在训练时可更新。
  • 定义一个3层的双向LSTM。它的隐状态和细胞状态因为是双向的,所以维度都为hid_dim*2。
  • 双向LSTM输出有三个变量:outputs,形状为 (seq_len, batch_size, hid_dim*2),表示每个时间步、最后一层LSTM的双向隐状态拼接。(hidden, cell) ,形状都为(num_layers * 2, batch_size, hid_dim)表示每一层、每个方向在最后一个时间步的隐状态或细胞状态。
  • 进行英译中时,因为英文句子是完整输入的,Encoder可以看到完整的英文输入,所以可以定义为双向的,但是Decoder输出中文翻译时,只能单向逐个生成,所以是单向的。因此Decoder的隐状态和细胞状态维度为hid_dim。
  • Encoder需要将自己最后一个时间步的隐状态和细胞状态传递给Decoder的第一个时间步,但是它们的隐状态和细胞状态向量的维度不同,所以,每个循环层都需要一个线性层来进行转化。每个线性层的输入维度为hid_dim*2,输出维度为hid_dim。

上边的代码进行了良好的注释,你可以逐行阅读和调试。

14.3.2 Attention

class Attention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        #第一层输入维度为Encoder的输出隐状态(因为是双向的,所以维度为hid_dim*2,)和Decoder的输入隐状态(单向,维度为hid_dim)的拼接。
        self.attn = nn.Linear(hid_dim * 2 + hid_dim, hid_dim)
        #输出一个代表注意力的logit值。
        self.v = nn.Linear(hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs, mask):
        # 调整Decoder当前时间步输入隐状态的维度: [1, batch, hid_dim] -> [batch, 1, hid_dim]   
        hidden = hidden.permute(1, 0, 2)
        # 调整Encoder各个时间步输出隐状态的维度: [src_len, batch, hid_dim*2] -> [batch, src_len, hid_dim*2]
        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        src_len = encoder_outputs.shape[1]
        # 中文当前一个token需要和英文所有token计算注意力。所以需要把中文token的状态复制多份,以便进行统一拼接。
        # 因为Decoder只有当前时间步输入的隐状态,复制到和Encoder输出隐状态同样的src_len。
        hidden = hidden.repeat(1, src_len, 1)  # [batch, src_len, hid_dim]

        # 拼接Decoder当前输入的隐状态和Encoder在各个时间步输出的隐状态,然后经过一个线性层,tanh激活。
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))  # [batch, src_len, hid_dim]
        # 输出Decoder当前中文token与所有英文token的注意力值。
        attention = self.v(energy).squeeze(2)  # [batch, src_len]
        # mask标志哪些位置为<pad>,对于填充的位置,注意力值为一个大的负值。这样经过softmax就为0。
        attention = attention.masked_fill(mask == 0, -1e10)
        # 利用softmax将注意力的值归一化。让生成当前中文token对输入的英文各个token的注意力之和为1。
        return torch.softmax(attention, dim=1)  # [batch, src_len]

Attention类的实现通过一个两层的神经网络来计算Decoder生成每个中文token对输入英文的每个token的注意力的值。并且这个中文token对输入所有英文token的注意力的值加和为1。因为输入时,我们在有的序列后边增加了 token。 token被分配的注意力永远为0。

计算Attention的两层神经网络会分别计算当前Decoder要生成的中文token对Encoder每个英文token的注意力。这个网络的输入就为Encoder每个token的输出隐状态(因为是双向的,所以维度为hid_dim*2,)和Decoder的输入隐状态(单向,维度为hid_dim)的拼接。输出为一个代表注意力的值。

上边的代码进行了很好的注释,你可以逐行理解并调试。

14.3.3 Decoder

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, attention, n_layers=3):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=PAD_ID)
        # 单向LSTM,输入维度为注意力加权后的,Encoder输出隐状态维度(hid_dim*2)加上输入token的embedding的维度emb_dim。
        self.lstm = nn.LSTM(hid_dim * 2 + emb_dim, hid_dim,num_layers=n_layers)
        # 最终分类头,输入为hid_dim,输出为字典大小
        self.fc_out = nn.Linear(hid_dim, output_dim)

    def forward(self, input_token, hidden, cell, encoder_outputs, mask):
        # input_token: [batch]
        # 输入单个字符,增加一个维度
        input_token = input_token.unsqueeze(0)  # [1, batch]
        # 获取token的embedding
        embedded = self.embedding(input_token)  # [1, batch, emb_dim]
        # 获取当前步的输入隐状态
        last_hidden = hidden[-1].unsqueeze(0)
        # 当前步对所有encoder输出的注意力
        a = self.attention(last_hidden, encoder_outputs, mask)  # [batch, src_len]
        a = a.unsqueeze(1)  # [batch, 1, src_len]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)  # [batch, src_len, enc_hid_dim*2]

        # 用矩阵乘法获取注意力上下文向量
        weighted = torch.bmm(a, encoder_outputs)  # [batch, 1, enc_hid_dim*2]
        weighted = weighted.permute(1, 0, 2)  # [1, batch, enc_hid_dim*2]
        # 拼接输入token编码向量和注意力上下文向量
        lstm_input = torch.cat((embedded, weighted), dim=2)  # [1, batch, emb_dim + enc_hid_dim*2]
        # 一次只执行lstm的一个时间步。
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))  # output: [1, batch, hid_dim]

        # 移除第0维(第一个维度)
        output = output.squeeze(0)  # [batch, hid_dim]
        # 计算分类logtis
        prediction = self.fc_out(output)  # [batch, output_dim]

        return prediction, hidden, cell, a.squeeze(1)  # attention weights for visualization

Decoder的设计如下:

  • 定义了中文token的embedding。
  • 单向3层的LSTM作为Decoder,输入为注意力上下文向量拼接当前时刻输入token的embedding。
  • 一个线性层作为分类头,输入为每个时间步的隐状态,输出为字典维度的向量,经过softmax后为字典每个token作为当前中文token输入的概率。
  • Decoder的调用是逐步调用的,每次调用都输入前一个时刻的隐状态,细胞状态。以及注意力上下文向量和当前token的embedding的拼接向量。生成输出隐状态,细胞状态。然后用输出隐状态对字典里每个token进行分类。
  • 这里我们实现的分类头输入只有当前时间步LSTM输出的隐状态,实际项目中有时也会把注意力上下文向量拼接上,一起作为最后分类头的输入,这样会提升模型性能。

14.3.4 Seq2Seq

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, src_len, trg):
        batch_size = trg.shape[1]
        max_len = trg.shape[0]
        vocab_size = self.decoder.output_dim

        outputs = torch.zeros(max_len, batch_size, vocab_size).to(self.device)
        # 调用encoder
        encoder_outputs, hidden, cell = self.encoder(src, src_len)

        input_token = trg[0]
        mask = (src != PAD_ID).permute(1, 0)
        # 逐步调用Decoder
        for t in range(1, max_len):
            output, hidden, cell, _ = self.decoder(input_token, hidden, cell, encoder_outputs, mask)
            outputs[t] = output
            input_token = trg[t]

        return outputs

Seq2Seq将Encoder和Decoder组装在一起,输入为英文token id序列,一次性调用encoder,输出output、hidden、cell向量,然后逐步调用Decoder。需要注意的是每一步我们都是以Label数据token作为输入,而不是以Decoder上一步生成的token作为输入。

14.3.5 train方法

def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    step_loss = 0  # 用于累计每个step的loss
    step_count = 0  # 当前step计数器

    for i, (src, trg, src_len, _) in enumerate(iterator):
        src, trg = src.to(model.device), trg.to(model.device)
        optimizer.zero_grad()
        output = model(src, src_len, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()

        # 更新损失统计
        step_loss += loss.item()
        epoch_loss += loss.item()
        step_count += 1

        # 每100个step打印一次
        if (i + 1) % 100 == 0:
            avg_step_loss = step_loss / step_count
            print(f'Step [{i + 1}/{len(iterator)}] | Loss: {avg_step_loss:.4f}')
            step_loss = 0  # 重置step损失
            step_count = 0  # 重置step计数器

    return epoch_loss / len(iterator)  # 返回整个epoch的平均loss

上边代码是对Seq2Seq模型进行训练的方法,相信你应该可以看懂,我们不做解释。

14.3.6 main方法

if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    dataset = TranslationDataset('data\\en2cn\\valid_en.txt', 'data\\en2cn\\valid_zh.txt', tokenize_en, tokenize_cn)
    loader = DataLoader(dataset, batch_size=48, shuffle=True, collate_fn=TranslationDataset.collate_fn)

    INPUT_DIM = sp_en.get_piece_size()
    OUTPUT_DIM = sp_cn.get_piece_size()
    ENC_EMB_DIM = 512
    DEC_EMB_DIM = 512
    HID_DIM = 1024

    attention = Attention(HID_DIM).to(device)
    encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM).to(device)
    decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, attention).to(device)
    model = Seq2Seq(encoder, decoder, device).to(device)

    optimizer = optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID)

    N_EPOCHS = 1

    for epoch in range(N_EPOCHS):
        loss = train(model, loader, optimizer, criterion)
        print(f'Epoch {epoch + 1}/{N_EPOCHS} | Loss: {loss:.4f}')
        torch.save(model.state_dict(), f'seq2seq_bpe_attention_epoch{epoch + 1}.pt')

上边代码是训练模型的入口,创建了模型,定义了优化器和loss函数,指定迭代次数,每个迭代保存一个模型。这里为了实验,可以设置为1个迭代。我在3090,24GB显存的机器上,训练一个迭代大概需要24小时。你根据你的情况可以调整batch_size和N_EPOCHS的设置。

results matching ""

    No results matching ""