72 lines
2.7 KiB
Python
72 lines
2.7 KiB
Python
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
class ScaledDotProductAttention(nn.Module):
|
|
''' Scaled Dot-Product Attention '''
|
|
|
|
def __init__(self, temperature, attn_dropout=0.1):
|
|
super().__init__()
|
|
self.temperature = temperature
|
|
self.softmax = nn.Softmax(dim=2)
|
|
|
|
def forward(self, q, k, v, mask=None):
|
|
attn = torch.bmm(q, k.transpose(1, 2))
|
|
attn = attn / self.temperature
|
|
if mask is not None:
|
|
attn = attn.masked_fill(mask, -np.inf)
|
|
attn = self.softmax(attn)
|
|
output = torch.bmm(attn, v)
|
|
return output, attn
|
|
|
|
|
|
|
|
class MultiHeadAttention(nn.Module):
|
|
''' Multi-Head Attention module '''
|
|
|
|
def __init__(self, n_head, d_model, d_k, d_v):
|
|
super().__init__()
|
|
self.n_head = n_head
|
|
self.d_k = d_k
|
|
self.d_v = d_v
|
|
# linear layers
|
|
self.w_qs = nn.Linear(d_model, n_head * d_k)
|
|
self.w_ks = nn.Linear(d_model, n_head * d_k)
|
|
self.w_vs = nn.Linear(d_model, n_head * d_v)
|
|
self.fc = nn.Linear(n_head * d_v, d_model)
|
|
# initialize weights
|
|
nn.init.normal_(self.w_qs.weight, mean=0, std=np.sqrt(2.0 / (d_model + d_k)))
|
|
nn.init.normal_(self.w_ks.weight, mean=0, std=np.sqrt(2.0 / (d_model + d_k)))
|
|
nn.init.normal_(self.w_vs.weight, mean=0, std=np.sqrt(2.0 / (d_model + d_v)))
|
|
nn.init.xavier_normal_(self.fc.weight)
|
|
# normalization
|
|
self.layer_norm = nn.LayerNorm(d_model)
|
|
# scaled dot-product
|
|
self.attention = ScaledDotProductAttention(temperature=np.power(d_k, 0.5))
|
|
|
|
|
|
def forward(self, q, k, v, mask=None):
|
|
# dimensions
|
|
d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
|
|
sz_b, len_q, _ = q.size()
|
|
sz_b, len_k, _ = k.size()
|
|
sz_b, len_v, _ = v.size()
|
|
# linear transformations
|
|
residual = q
|
|
q = self.w_qs(q).view(sz_b, len_q, n_head, d_k)
|
|
k = self.w_ks(k).view(sz_b, len_k, n_head, d_k)
|
|
v = self.w_vs(v).view(sz_b, len_v, n_head, d_v)
|
|
# scaled dot-product attention
|
|
q = q.permute(2, 0, 1, 3).contiguous().view(-1, len_q, d_k) # (n*b) x lq x dk
|
|
k = k.permute(2, 0, 1, 3).contiguous().view(-1, len_k, d_k) # (n*b) x lk x dk
|
|
v = v.permute(2, 0, 1, 3).contiguous().view(-1, len_v, d_v) # (n*b) x lv x dv
|
|
if mask:
|
|
mask = mask.repeat(n_head, 1, 1) # (n*b) x .. x ..
|
|
output, attn = self.attention(q, k, v, mask=mask)
|
|
# linear transformation
|
|
output = output.view(n_head, sz_b, len_q, d_v)
|
|
output = output.permute(1, 2, 0, 3).contiguous().view(sz_b, len_q, -1) # b x lq x (n*dv)
|
|
output = self.fc(output)
|
|
# normalization
|
|
output = self.layer_norm(output + residual)
|
|
return output, attn |