Loading... ## 前言 哈佛NLP团队实现的Pytorch版代码, 项目[链接](https://github.com/harvardnlp/annotated-transformer). 论文[链接](https://arxiv.org/abs/1706.03762) 以下内容按照自己的理解过程一步步记录, 与源码中可能不一致. ![Refer to caption](https://zoe.red/usr/uploads/2024/03/3146678007.png) --- ## 网络 按照数据流向以及先左侧后右侧顺序依次理解. 为了便于直观理解, 部分需要映射真实场景的内容结合翻译场景进行解释. ### Embedding 首先是`Embedding`层, 在Encoder与Decoder中最下方都使用到. 在翻译场景中, input embedding与output Embedding 接受的输入是被映射过的句子/序列, 句子/序列中元素都是一个个被离散化的数值. 映射示例: ```plaintext # 原始语料中的句子 [ "I love you." ... ] # 分词后 [ ["I", "love", "you", "."] ... ] # 词汇表构建 # 将全部的语料中分词后结果进行汇总统计, 确认要包含的词汇 # * 词汇表还会过滤出现频次特别少的词, 以及增加部分特殊标记 [ ..., "I", "love", "you", ".", ... ] # 词映射: 将句子中的每个词分别通过构建的词汇表映射到一个离散化的值(可以简单理解为在词汇表中的索引) # 映射后的结果, 在经过句子/序列长度对齐后,就可以作为训练输入送入到Embedding层了 # 为了方便表示, 这里假定映射值对应关系如下 "I": 5, "love": 6, 'you': 7, ".": 8 [ [5,6,7,8], ... ] ``` 代码如下: * 接受2个参数, 一个vocab表示词汇表长度, 一个d_model表示嵌入维度, 表示将一个离散数值映射到长度为d_model的向量, 在知名的llm中, 如gtp/llama等, d_model非常大, 几千甚至上万. ```python class Embeddings(nn.Module): def __init__(self, vocab, d_model): """词嵌入层 将数值化/索引话后的词(词汇)表示映射到高维空间, 便于更好捕获词(词汇)间的关联 Args: vocab (int): 字典(词汇表)的大小/长度 d_model (int): dim of model, 词汇索引(离散值)转换向量后维度大小 """ super().__init__() self.d_model = d_model # nn.Embedding的权重矩阵为vocab*d_model的矩阵, # 使用过程类似于tensor, 本质为通过词(词汇)索引查找输出为对应位置的向量值 self.lut = nn.Embedding(num_embeddings=vocab, embedding_dim=d_model) def forward(self, x): # x维度: batch_size*sentence_length*d_model return self.lut(x) * math.sqrt(self.d_model) ``` --- ### PositionalEncoding 一个词出现在句子/序列中的相对或者绝对位置, 可能代表着不同的意义. 因此需要给向量化话后的句子/序列中的每个词加入位置信息. 文中作者使用的为固定位置编码, 公式如下(作者也对比过可学习位置编码, 对整体模型性能无显著差异) * 每个词的词嵌入向量, 偶数位置(0,2,4...)使用sin * 每个词的词嵌入向量, 奇数位置(1,3,5...)使用cos $$ PE_{pos, 2i} = sin(pos/{10000^{2i/d_{model}}}) $$ $$ PE_{pos, 2i+1} = cos(pos/{10000^{2i/d_{model}}}) $$ 代码实现 ```python class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000, dropout=0.1): """3.5节: PE 位置编码层 对句子中的词汇的位置(绝度/相对)信息进行编码, 附加到词嵌入层后的向量中 Args: max_len (int): 可能出现的最大句子长度, 默认设置5000, 根据实际可灵活调整 d_model (int): 词嵌入维度 """ super().__init__() self.dropout = nn.Dropout(p=dropout) # PE初始化, 只需要进行一次即可 # 注意下面代码的计算方式与公式中给出的是不同的,但是是等价的,可以尝试简单推导证明一下。 position = torch.arange(0, max_len).unsqueeze(dim=1) # N => N*1 # !!! 此处实现与公式给出形式不同, 但等价于 1 / (10000. ** (torch.arange(0, d_model, 2) / d_model)) # !!! 如此处理是为了避免中间的数值计算结果超出float32的范围 # !!! 简单验证: 可以直接使用float64存储中间变量, d_model=30, eps=1e-6 div_term = torch.exp( -math.log(10000.) * torch.arange(0, d_model, 2) / d_model ) # [d_model/2] #!!! 此处会更新pe第二维度的d_model个元素 # * 上半偶数位置: 使用sin变换后的 # * 下半奇数位置: 使用cos变换后的 # position * div_term 计算: [N, 1] x [d_model/2] => [N, d_model/2] pe = torch.zeros(max_len, d_model) pe[:, ::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(dim=0) # [N, d_model] => [1, N, d_model], 方便与词嵌入输出张量相加 self.register_buffer('pe', pe) def forward(self, x): # 由于实际句子的长度不会等于初始化设置的最大上限 # 因此只需要截取实际句子长度部分的位置编码结果进行后续运算即可 sentence_length = x.size(1) x = x + self.pe[:, :sentence_length].requires_grad_(False) return self.dropout(x) ``` --- ### SubLayerConnection 在每个DecoderLayer与EncoderLayer中, 都用到了残差连接, 代码实现如下. ```python class SubLayerConnection(nn.Module): """子层的残差链接结构 """ def __init__(self, size, dropout=0.1): super().__init__() self.norm = nn.LayerNorm(normalized_shape=size) self.dropout = nn.Dropout(p=dropout) def forward(self, x, sub_layer): #!!! 此处看到了几种实现方案 # 方案1: 示例代码实现 # return x + self.dropout(sub_layer(self.norm(x))) # 方案2: 感觉更符合原paper图示, 但实测收敛更慢 # return self.norm(x + self.dropout(sub_layer(x))) # 方案3: 在方案2基础上把x从norm中拿出开,保证有一条"高速公路", # 可以是的模型更快收敛, 需要进一步验证!!! return x + self.norm(self.dropout(sub_layer(x))) ``` --- ### Scaled Dot-Product Attention ![Refer to caption](https://zoe.red/usr/uploads/2024/03/1350825705.png) 模块如图所示, 相比于标准的Dot-Product Attention, 这里scale对应$1/d_{k}$对QK的点积结果进行缩放, 避免在$d_k$较大时造成点积幅度变大,从而避免可能造成softmax的梯度可能变得极小的问题. $$ Attention(Q,K,V) = softmax({\frac{QK^T}{d_k}})V $$ 代码实现 ```python def attention(query, key, value, mask=None, dropout=None): """计算 Scaled Dot Product Attention Args: query (矩阵): 查询矩阵 [b,h,_,d_k], 即[batch_size, head_count, sentence_length, , d_model/head_count] key (_type_): 矩阵 [b,h,_,d_k] value (_type_): 矩阵[b,h,_,d_k] mask (_type_, optional): 取值0,1的矩阵, 用于排除无需计算的区域(QK相关度). Defaults to None. dropout (_type_, optional): dropout层. Defaults to None. """ # 计算Q K的相关度 d_k = query.size(-1) # 词嵌入维度/head scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) # [b,h,_,d_k] * [b,h,d_k,_] = [b,h,_,_] if mask is not None: # 给非目标区域设置为一个接近-inf的数字, 可以保证其在softmax里值接近0 # 即: e^(-inf) ~= 0 scores = scores.masked_fill(mask == 0, -1e9) # mask: [B,1,?,_] p_atten = F.softmax(scores, dim=-1) if dropout is not None: p_atten = dropout(p_atten) # [b,h,_,_] * [b,h,_,d_k] = [b,h,_,d_k] return torch.matmul(p_atten, value), p_atten ``` --- ### Multi-Head Attention ![Refer to caption](https://zoe.red/usr/uploads/2024/03/3590337598.png) 网络结构如图所示, 默认此处使用head=8, 在llm中, 此处head可能高达数百, 用于更好聚焦/捕捉不同性质. ``` class MultiHeadAttention(nn.Module): """3.2.2节: 多头注意力子层 不同的头来关注不同的语义 """ def __init__(self, head, d_model, dropout=0.1): """_summary_ Args: head (_type_): 多头注意力的头数 d_model (_type_): 词嵌入维度 dropout (float, optional): dropout层概率. Defaults to 0.1. """ super().__init__() assert d_model % head == 0, '多头分组异常' self.d_k = d_model // head self.h = head self.dropout = nn.Dropout(p=dropout) self.atten = None # 用于记录forward阶段attention计算结果 self.linears = clones(nn.Linear(d_model, d_model), 4) # 用于q/k/v 以及atten后 def forward(self, query, key, value, mask=None): """ 实现Figure 2的右图结构""" if mask is not None: mask = mask.unsqueeze(dim=1) # [B,?,?] => [B,1,?,?] batch_size = query.size(0) # 1. 线性投影与变化 # q/k/v 分别显性变化后再多头化, [B,_,d_model] => [B,_, head, d_model/head], 即[B,_,h, d_k] # 之后再交换1,2维度, 最终结果[B,h,_,d_k] query, key, value = [ linear(_).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) for linear, _ in zip(self.linears, (query, key, value)) ] # 2. 应用Attention # x: [b,h,_,d_k], atten: [b,h,_,_] x, self.atten = attention(query, key, value, mask, self.dropout) # 3. concat # [b,h,_,d_k] => [b,_,h,d_k] => [b,_, d_model] x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k) return self.linears[-1](x) ``` --- ### PositionwiseFeedForward 前馈网络子层, 包含2层全连接层, 中间使用了`ReLU`激活, 用于增加模型的拟合能力. * 公式中max对应`ReLU` $$ FFN(x)=max(0, xW_1+b_1)W_2 + b_2 $$ 代码实现 ``` class PositionwiseFeedForward(nn.Module): """ 3.3节: 前馈网络子层 """ def __init__(self, d_model, d_ff=2048, dropout=0.1): super().__init__() self.fc1 = nn.Linear(d_model, d_ff) self.fc2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(p=dropout) def forward(self, x): return self.fc2(self.dropout(self.fc1(x).relu())) ``` --- ### Decoder & DecoderLayer 以上部分已经完成DecoderLayer必要组件, 完整DecoderLayer实现代码如下 ```python class EncoderLayer(nn.Module): """ 第三节, 图一, 编码层 """ def __init__(self, size, self_atten, feed_forward, dropout=0.1): super().__init__() self.size = size self.self_atten = self_atten self.feed_forward = feed_forward self.sub_layers = clones(SubLayerConnection(size, dropout), 2) def forward(self, x, mask): """ 根据图一左侧结构实现 """ x = self.sub_layers[0](x, sub_layer=lambda x: self.self_atten(x, x, x, mask)) # x = self.sub_layers[1](x, sub_layer=lambda x: self.feed_forward(x)) # eq x = self.sub_layers[1](x, sub_layer=self.feed_forward) return x ``` 一个Decoder包含N组DecoderLayer,在文章中`N=6`, 实现代码如下 ```python class Encoder(nn.Module): """ 第三节, 图一, 完整编码器, N默认为6 """ def __init__(self, layer, N=6): super().__init__() self.layers = clones(layer, N) self.norm = nn.LayerNorm(layer.size) def forward(self, x, mask): """ 分别传递x与mask到每个子层, 并返回最终结果 """ for layer in self.layers: x = layer(x, mask) # 对照连接层的实现, 残差连接子层forword部分模式2的话, 此处可以注释norm x = self.norm(x) return x ``` --- ### Decoder & DecoderLayer DecoderLayer包含3个子层, 比EncoderLayer子层多了一组MultiHeadAttention, 接收来自Decoder部分的最后输出用于其中的KV的表示. 完整实现如下 ```python class DecoderLayer(nn.Module): """ 第三节, 图一, 编码层 """ def __init__(self, size, self_atten, src_atten, feed_forward, dropout=0.1): super().__init__() self.size = size self.self_atten = self_atten # 第一子层 self.src_atten = src_atten # 第二子层 self.feed_forward = feed_forward # 第三子层 self.sub_layers = clones(SubLayerConnection(size, dropout), 3) def forward(self, x, memory, src_mask, tgt_mask): """根据图一右侧 结构实现 Args: x (_type_): _description_ memory (_type_): 左侧Encoder部分最后输出 src_mask (_type_): tgt_mask (_type_): 下三角矩阵, 表示句子当前行可见其它内容的范围 """ m = memory x = self.sub_layers[0](x, sub_layer=lambda x: self.self_atten(x, x, x, tgt_mask)) x = self.sub_layers[1](x, sub_layer=lambda x: self.src_atten(x, m, m, src_mask)) x = self.sub_layers[2](x, sub_layer=self.feed_forward) return x ``` 特别注意一点, 训练过程此部分用到了下三角矩阵MASK, 每一行对应表示在计算第n词的时候, 能够看到范围 ```python def subsequent_mask(size): "获取下三角矩阵: 在Mask每一行位置处, 标记了当前可见的位置 " attn_shape = (1, size, size) subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8) return subsequent_mask == 0 # test def mask_show(vis_mask): import matplotlib.pyplot as plt from matplotlib.pyplot import MultipleLocator # 支持中文 # UBuntu22.04系统手动安装指定字体示例教程: # 链接: https://zoe.red/2024/364.html plt.rcParams['font.sans-serif'] = ['Smiley Sans'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # plt.figure(figsize=(6, 6), dpi=120) plt.title("mask in decoder_layer's attention") plt.xlabel('可见范围') # 可见范围 plt.ylabel('当前位置') # 当前位置 # 设置x轴和y轴的坐标间隔为1 ax=plt.gca() ax.xaxis.set_major_locator(MultipleLocator(1)) ax.yaxis.set_major_locator(MultipleLocator(1)) # plt.xticks(rotation=0) # 将x轴刻度标签旋转90度 # plt.yticks(rotation=0) plt.imshow(vis_mask[0]) plt.show() mask_show(subsequent_mask(20)) ``` tgt_mask可视化 ![DecoderLayer部分第一层Attention输入mask](https://zoe.red/usr/uploads/2024/03/413423737.png) Decoder部分同样有N组DecoderLayer, 在文章中`N=6`, 完整实现如下 ``` class Decoder(nn.Module): """ 第三节, 图一, 完整解码器, N默认为6 """ def __init__(self, layer, N=6): super().__init__() self.layers = clones(layer, N) self.norm = nn.LayerNorm(layer.size) def forward(self, x, memory, src_mask, tgt_mask): """ 分别传递x与...到每个子层, 并返回最终结果 """ for layer in self.layers: x = layer(x, memory, src_mask, tgt_mask) # 对照连接层的实现, 此处可以不用做 x = self.norm(x) return x ``` --- ### Generator 为了产生最后的预测结果, 还需要在Decoder的输出上实现输出层, 代码实现如下 * 一层全连接, 输出长度为目标词汇表长度 * 输出为log_softmax, 非概率值, 但是用于计算最大值所在位置是可以的 ```python class Generator(nn.Module): " 尾部输出层: linear + softmax" def __init__(self, d_model, vocab): """网络末尾生成预测分数 Args: d_model (_type_): 嵌入维度 vocab (_type_): 目标字典长度 """ super().__init__() self.proj = nn.Linear(d_model, vocab) def forward(self, x): return F.log_softmax(self.proj(x), dim=-1) ``` --- ### EncoderDecoder 已经实现了单独的Encoder与Decoder模块, 现在构建一个完整的EncoderDecoder, 代码如下 ```python class EncoderDecoder(nn.Module): """ 完整的编码/解码器 """ def __init__(self, encoder, decoder, embed_src, embed_tgt, generator): """完整编码器+解码器+2组输入+1输出 Args: encoder (_type_): _description_ decoder (_type_): _description_ embed_src (_type_): embed+pe embed_tgt (_type_): embed+pe generator (_type_): _description_ """ super().__init__() self.encoder = encoder self.decoder = decoder self.embed_src = embed_src self.embed_tgt = embed_tgt self.generator = generator # 输出log_softmax概率最大值 def forward(self, src, tgt, src_mask, tgt_mask): memory = self.encode(src, src_mask) return self.decode(memory, src_mask, tgt, tgt_mask) def encode(self, src, src_mask): return self.encoder(self.embed_src(src), src_mask) def decode(self, memory, src_mask, tgt, tgt_mask): return self.decoder(self.embed_tgt(tgt), memory, src_mask, tgt_mask) ``` --- ### 模型创建 对以上进行封装, 直接返回model, 代码实现 ```python def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1): """创建一个完整的tranformer网络 Args: src_vocab (_type_): 输入 词汇表(字典)大小/长度 tgt_vocab (_type_): 输出 词汇表(字典)大小/长度 N (int, optional): 编码/解码子模块堆叠层数. Defaults to 6. d_model (int, optional): 词嵌入维度. Defaults to 512. d_ff (int, optional): 前馈网络FC子层变换参数. Defaults to 2048. h (int, optional): 多头注意力的头数(分组组数), 要求一定可以被词嵌入维度整除. Defaults to 8. dropout (float, optional): dropout层概率. Defaults to 0.1. """ pe = PositionalEncoding(d_model, max_len=5000, dropout=dropout) atten = MultiHeadAttention(head=h, d_model=d_model, dropout=dropout) ff = PositionwiseFeedForward(d_model, d_ff, dropout=dropout) encoder_layer = EncoderLayer(d_model, deepcopy(atten), deepcopy(ff), dropout=dropout) decoder_layer = DecoderLayer(d_model, deepcopy(atten), deepcopy(atten), deepcopy(ff), dropout=dropout) model = EncoderDecoder( encoder=Encoder(encoder_layer, N), decoder=Decoder(decoder_layer, N), embed_src=nn.Sequential(Embeddings(vocab=src_vocab, d_model=d_model), deepcopy(pe)), embed_tgt=nn.Sequential(Embeddings(vocab=tgt_vocab, d_model=d_model), deepcopy(pe)), generator=Generator(d_model=d_model, vocab=tgt_vocab), ) # This was important from their code. # Initialize parameters with Glorot / fan_avg. for p in model.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) return model ``` ## 实战 对于序列预测任务, 在模型数据处理的过程需要引入一些特殊字符, 这是简单的说明 * `<s>`或`<sos>`, 表示句子/序列开始 * `</s`或`<eos>`, 表示句子/序列结束 * `<pad>`或`<blank>`, 空白字符, 用于句子长度对齐(方便训练), 添加在句子末尾 * `<unk>`, 未知字符, 一般用于表示因为出现频率特别少被过滤掉的词 损失: * 基于LabelSmoothing, 设置标签平滑, 不会使得预测分数更准, 但是对于[BLEU]()指标提升有帮助 ```python class LabelSmoothing(nn.Module): "Implement label smoothing." def __init__(self, size, padding_idx, smoothing=0.0): super(LabelSmoothing, self).__init__() self.criterion = nn.KLDivLoss(reduction="sum") self.padding_idx = padding_idx self.confidence = 1.0 - smoothing self.smoothing = smoothing self.size = size self.true_dist = None def forward(self, x, target): assert x.size(1) == self.size true_dist = x.data.clone() true_dist.fill_(self.smoothing / (self.size - 2)) true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence) true_dist[:, self.padding_idx] = 0 mask = torch.nonzero(target.data == self.padding_idx) if mask.dim() > 0: true_dist.index_fill_(0, mask.squeeze(), 0.0) self.true_dist = true_dist return self.criterion(x, true_dist.clone().detach()) ``` 推理: * 序列预测产生最终结果策略有多种,这里使用贪心解码, 即依次生成下一个预测字符; 对于最初的output输入,使用起始字符 * 批量推理实现 ```python def greedy_decode(model, src, src_mask, max_len, start_symbol=0, pad_id=2): """ 若GPU可用, 使用GPU前需转换下先 """ if isinstance(model, nn.DataParallel): model = model.module memory = model.encode(src, src_mask) batch_size = src.size(0) # tgt 代表目前已生成的序列,最初为仅包含一个起始符start_symbol的序列,不断将预测结果追加到序列最后 tgt = torch.ones(batch_size, 1).fill_(start_symbol).type_as(src.data) for i in range(max_len-1): tgt_mask = (tgt != pad_id).unsqueeze(-2) tgt_mask = tgt_mask & subsequent_mask(tgt.size(1)).type_as(src.data) out = model.decode(memory, src_mask, tgt, tgt_mask) prob = model.generator(out[:, -1]) # out: [B,_,d_model], _取值[1,n-1], 取最后一个词对应特征, 进行预测 _, next_word = torch.max(prob, dim=1) tgt = torch.cat([tgt, next_word.unsqueeze(dim=1)], dim=1) return tgt ``` 示例1: 序列copy, 输入一个序列, 输出序列的[1:] <div class="preview"> <div class="post-inser post box-shadow-wrap-normal"> <a href="https://zoe.red/2024/373.html" target="_blank" class="post_inser_a no-external-link no-underline-link"> <div class="inner-image bg" style="background-image: url(https://zoe.red/usr/uploads/2024/03/4056394574.png);background-size: cover;"></div> <div class="inner-content" > <p class="inser-title">Transformer实战-1序列拷贝</p> <div class="inster-summary text-muted"> 前言Transformer模型源码解析过程参见前文, 这里是后续的实战示例.序列拷贝任务: 即对于任何输入序列se... </div> </div> </a> <!-- .inner-content #####--> </div> <!-- .post-inser ####--> </div> 示例2: 翻译任务, 德语->英语 <div class="preview"> <div class="post-inser post box-shadow-wrap-normal"> <a href="https://zoe.red/2024/376.html" target="_blank" class="post_inser_a no-external-link no-underline-link"> <div class="inner-image bg" style="background-image: url(https://zoe.red/usr/uploads/2024/03/945521704.png);background-size: cover;"></div> <div class="inner-content" > <p class="inser-title">Transformer实战-2德译英</p> <div class="inster-summary text-muted"> 前言Transformer模型源码解析过程参见前文, 这里是后续的实战示例.数据处理语料这里使用对齐后的语料, 来... </div> </div> </a> <!-- .inner-content #####--> </div> <!-- .post-inser ####--> </div> 示例3: 翻译任务, 英语->中文 <div class="preview"> <div class="post-inser post box-shadow-wrap-normal"> <a href="https://zoe.red/2024/379.html" target="_blank" class="post_inser_a no-external-link no-underline-link"> <div class="inner-image bg" style="background-image: url(https://zoe.red/usr/themes/handsome/assets/img/sj/5.jpg);background-size: cover;"></div> <div class="inner-content" > <p class="inser-title">Transformer实战-3英译中</p> <div class="inster-summary text-muted"> 前言Transformer模型源码解析过程参见前文, 这里是后续的实战示例.数据处理语料这里使用对齐后的语料, 以... </div> </div> </a> <!-- .inner-content #####--> </div> <!-- .post-inser ####--> </div> <div class="hideContent">该部分仅登录用户可见</div> THE END 本文作者:将夜 本文链接:https://zoe.red/2024/367.html 版权声明:本博客所有文章除特别声明外,均默认采用 CC BY-NC-SA 4.0 许可协议。 最后修改:2024 年 03 月 30 日 © 允许规范转载 赞 如果觉得我的文章对你有用,请随意赞赏