본문 바로가기

책/파이토치 트랜스포머를 활용한 자연어 처리와 컴퓨터비전 심층학습

07 트랜스포머 (1) Transformer

트랜스포머의 이론적인 내용은 이전 글에서 볼 수 있습니다. 여기서는 파이토치에서 트랜스포머를 사용하는 방법에 대해서만 다룹니다.

2024.01.11 - [Deep Learning/NLP] - 트랜스포머

 

 

1. 포지셔널 인코딩

import math
import torch
from torch import nn
from matplotlib import pyplot as plt

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len, dropout=0.1):
    super().__init__()
    self.dropout = nn.Dropout(p=dropout)

    position = torch.arange(max_len).unsqueeze(1) # [max_len, 1]
    div_term = torch.exp(
        torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
    )

    pe = torch.zeros(max_len, 1, d_model)
    pe[:, 0, 0::2] = torch.sin(position * div_term)
    pe[:, 0, 1::2] = torch.cos(position * div_term)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = x + self.pe[: x.size(0)]
    return self.dropout(x)

 

초기화 메서드에서는 모델의 차원 크기인 d_model, 최대 입력 시퀀스 길이인 max_len, 그리고 드롭아웃 비율인 dropout을 매개변수로 받습니다. 간단한 예시를 들어가며 위의 코드를 이해해보겠습니다.

 

d_model = 32
max_len = 10

position = torch.arange(max_len).unsqueeze(1)
div_term = torch.exp(
    torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
)
print(position.shape)
print(div_term.shape)
torch.Size([10, 1])
torch.Size([16])

 

tensor.unsqueeze(dim) 메서드는 dim으로 지정한 차원을 추가합니다. 그러므로 position의 shape은 [10, 1]이 됩니다.

div_term은 포지셔널 인코딩 수식에서 계산된 항으로 이 값과 position을 곱하게 됩니다.

 

 

$PE(pos, 2i) = \sin (pos / {10000}^{2i \over d_{model}})$

$PE(pos, 2i+1) = \cos (pos / {10000}^{2i \over d_{model}})$

 

 

pe = torch.zeros(max_len, 1, d_model)

 

포지셔널 인코딩의 차원을 위와 같이 설정한 이유는 forward 메서드에서 입력 기대값이 [입력 시퀀스의 길이, 배치 크기, 모델의 차원]이기 때문입니다.

even_temp = position * div_term
print(even_temp.shape)

pe[:, 0, 0::2] = torch.sin(position * div_term)
pe[:, 0, 1::2] = torch.cos(position * div_term)
torch.Size([10, 16])

 

position과 div_term을 곱한 값의 크기는 [10, 16]이 되고 이를 각각 sin함수와 cos함수를 적용하여 pe의 d_model 차원의 짝수 인덱스와 홀수 인덱스에 지정해줍니다.

 

self.register_buffer('pe', pe): Positional Encoding을 버퍼로 등록합니다. 이렇게 하면 모델의 학습 가능한 파라미터로 취급되지 않습니다.

 

x = x + self.pe[: x.size(0)]: 입력 텐서에 Positional Encoding을 더합니다. 입력 텐서의 길이만큼의 Positional Encoding이 선택됩니다.

 

포지셔널 인코딩을 시각화 하면 다음과 같습니다.

encoding = PositionalEncoding(d_model=128, max_len=50)

plt.pcolormesh(encoding.pe.numpy().squeeze(), cmap='RdBu')
plt.xlabel("Embedding Dimension")
plt.ylabel("Position")
plt.xlim((0, 128))
plt.colorbar()
plt.show()

 

 

2. 데이터세트 다운로드 및 전처리

파이토치에서 제공하는 트랜스포머 모델을 활용해 독일어를 영어로 번역하는 모델을 구성해보겠습니다. 데이터세트는 자연어 처리를 위한 대규모 다국어 데이터세트 중 하나인 Multi30k 데이터세트를 사용합니다.

 

Multi30k 데이터세트는 영어-독일어 병렬 말뭉치로 약 30,000개의 데이터를 제공하며 torchdata와 torchtext 라이브러리로 해당 데이터세트를 쉽게 다운로드할 수 있습니다.

!python -m spacy download de_core_news_sm
!python -m spacy download en_core_web_sm
!pip install portalocker
  • !python -m spacy download de_core_news_sm: 이 명령어는 SpaCy에서 독일어 (de) 모델인 de_core_news_sm을 다운로드합니다.
  • !python -m spacy download en_core_web_sm: 이 명령어는 SpaCy에서 영어 (en) 모델인 en_core_web_sm을 다운로드합니다.
  • portalocker 라이브러리는 파이썬에서 파일 락을 관리하기 위한 라이브러리로, 파일 락을 사용해 여러 프로세스 간에 동시에 파일을 수정하거나 읽는 것을 방지합니다. Multi30k 데이터세트를 다운로드하고 압축을 해제하는 과정에서 내부적으로 사용됩니다.
from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 3
def generate_tokens(text_iter, language):
  language_index = {SRC_LANGUAGE:0, TGT_LANGUAGE:1}

  for text in text_iter:
    yield token_transform[language](text[language_index[language]])

# 1
SRC_LANGUAGE = "de"
TGT_LANGUAGE = "en"
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ["<unk>", "<pad>", "<bos>", "<eos>"]

token_transform = {
    SRC_LANGUAGE: get_tokenizer("spacy", language="de_core_news_sm"),
    TGT_LANGUAGE: get_tokenizer("spacy", language="en_core_web_sm"),
}

# 2
vocab_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = Multi30k(split="train", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[language] = build_vocab_from_iterator(
        generate_tokens(train_iter, language),
        min_freq=1,
        specials=special_symbols,
        special_first=True,
    )

# 4
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[language].set_default_index(UNK_IDX)

 

get_tokenizer(tokenizer, language='en') : 

주어진 언어에 대한 토크나이저를 반환합니다.

 

build_vocab_from_iterator(iterator, min_freq=1, specials=None, special_first=True, max_tokens=None) :

 주어진 iterator로 Vocab 객체를 구축하여 반환합니다. Vocab 객체에는 lookup_token(index : int) 메서드가 존재하는데 이는 index를 입력 받아 대응하는 토큰을 반환합니다. 뒷부분에서 번역할 때 사용합니다. 예를 들어 index 10에 해당하는 단어가 'love'이면 10을 입력받아 'love'를 반환하는 식입니다.

 

  • min_freq : 토큰을 사전에 포함시키기 위한 최소 빈도수
  • specials : 추가할 special symbols입니다. 여기서는 트랜스포머에 사용되는 특수 토큰으로 지정합니다.
  • special_first : 특수 토큰을 사전의 맨 앞에 추가합니다.

 

주석 순서대로 이해해보겠습니다.

# 1

소스 언어, 타깃 언어, 특수 토큰의 인덱스, 스페셜 토큰을 지정합니다. token_transform에는 언어를 key로 하여 해당 언어에 해당하는 토크나이저를 저장합니다.

 

#2

Multi30k의 split 매개변수를 통해 데이터셋의 특정 부분을 선택할 수 있습니다. 예를 들어, split="train"은 훈련 데이터를 로드하고, split="valid"는 검증 데이터를 로드합니다. language_pair 매개변수를 통해 원하는 언어 쌍을 선택할 수 있습니다.

 

#3

token_transform을 사용하여 입력받은 언어에 해당하는 토크나이저를 사용해 토큰 iterator를 반환합니다. 그런 다음 이를 #2의 build_vocab_from_iterator의 iterator로 사용하여 vocab_transfrom에 언어를 key로 하여 해당 언어의 Vocab 객체를 저장합니다.

 

#4

set_default_index는 Vocab 객체의 default 인덱스를 설정하는 메서드로, 만약 어휘에 속하지 않는 단어에 대한 인덱스를 요청할 때 반환되는 값이 <unk> 토큰의 인덱스로 설정됩니다.

 

 

3. 트랜스포머 모델 구성

class TokenEmbedding(nn.Module):
  def __init__(self, vocab_size, emb_size):
    super().__Init__()
    self.embedding = nn.Embedding(vocab_size, emb_size)
    self.emb_size = emb_size

  def forward(self, tokens):
    return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

 

tokens.long()은 토큰을 정수로 변환합니다. 이후에 임베딩된 결과에는 math.sqrt(self.emb_size)를 곱하여 임베딩 차원 크기에 루트를 취한 값을 곱합니다. 이 부분은 Transformer 모델에서의 특별한 스케일링을 나타냅니다.

 

Seq2SeqTransformer 클래스를 보기전에 해당 클래스에서 사용되는 마스킹을 만드는 함수를 먼저 보겠습니다.

def generate_square_subsequent_mask(s):
  mask = (torch.triu(torch.ones((s, s), device=DEVICE)) == 1).transpose(0, 1)
  mask = (
      mask.float()
      .masked_fill(mask == 0, float("-inf"))
      .masked_fill(mask == 1, float(0.0))
  )
  return mask

 

이 함수는 디코더의 셀프 어텐션에서 사용되는 마스크를 만듭니다.

 

예시를 들어 이해해보겠습니다.

mask = torch.triu(torch.ones(5, 5)) == 1
mask
tensor([[ True,  True,  True,  True,  True],
        [False,  True,  True,  True,  True],
        [False, False,  True,  True,  True],
        [False, False, False,  True,  True],
        [False, False, False, False,  True]])

 

torch.triu(input) : input 텐서의 대각선을 기준으로 아래는 0, 위쪽은 input 텐서의 값을 그대로 반환합니다.

 

mask = mask.transpose(0, 1)
mask
tensor([[ True, False, False, False, False],
        [ True,  True, False, False, False],
        [ True,  True,  True, False, False],
        [ True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True]])

 

transpose를 사용하여 마스크를 정상적인 형태로 변환합니다. False에 해당하는 부분이 디코더의 셀프 어텐션에서 마스킹이됩니다.

 

mask = (
    mask.float()
    .masked_fill(mask == 0, float("-inf"))
    .masked_fill(mask == 1, float(0.0))
)
mask
tensor([[0., -inf, -inf, -inf, -inf],
        [0., 0., -inf, -inf, -inf],
        [0., 0., 0., -inf, -inf],
        [0., 0., 0., 0., -inf],
        [0., 0., 0., 0., 0.]])

 

그런 다음 bool형 mask를 float() 메서드로 True = 1.0, False = 0.0으로 바꿔줍니다. tensor의 masked_fill 메서드를 통해 mask가 0인 부분은 -inf, 1인 부분은 0.0으로 바꿔줍니다.

 

다음은 전체 마스크를 만드는 함수입니다.

def create_mask(src, tgt):
  src_seq_len = src.shape[0]
  tgt_seq_len = tgt.shape[0]

  tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
  src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

  src_padding_mask = (src == PAD_IDX).transpose(0, 1) # [batch_size, src_seq_len]
  tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1) # [batch_size, tgt_seq_len]
  return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

 

이제 Seq2SeqTransformer 클래스를 보겠습니다.

class Seq2SeqTransformer(nn.Module):
  def __init__(self, num_encoder_layers, num_decoder_layers, emb_size, max_len, nhead,
               src_vocab_size, tgt_vocab_size, dim_feedforward, dropout=0.1):
    super().__init__()
    self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
    self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
    self.positional_encoding = PositionalEncoding(d_model=emb_size, max_len=max_len, dropout=dropout)
    self.transformer = nn.Transformer(
        d_model=emb_size,
        nhead=nhead,
        num_encoder_layers=num_encoder_layers,
        num_decoder_layers=num_decoder_layers,
        dim_feedforward=dim_feedforward,
        dropout=dropout,
    )
    self.generator = nn.Linear(emb_size, tgt_vocab_size)

  def forward(self, src, trg, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):
    src_emb = self.positional_encoding(self.src_tok_emb(src))
    tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
    outs = self.transformer(
        src=src_emb,
        tgt=tgt_emb,
        src_mask=src_mask,
        tgt_mask=tgt_mask,
        memory_mask=None,
        src_key_padding_mask=src_padding_mask,
        tgt_key_padding_mask=tgt_padding_mask,
        memory_key_padding_mask=memory_key_padding_mask,
    )
    return self.generator(outs)
  
  def encode(self, src, src_mask):
    return self.transformer.encoder(
        self.positional_encoding(self.src_tok_emb(src)), src_mask
    )
  
  def decode(self, tgt, memory, tgt_mask):
    return self.transformer.decoder(
        self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask
    )

 

transformer의 forward 메서드에서 memory_mask는 인코더 출력의 마스크로 [타깃 시퀀스 길이, 소스 시퀀스 길이]의 형태를 가집니다. 나중에 학습할 때 보겠지만 이번 실습에서는 사용하지 않는 매개변수입니다.

 

nn.Transformer 클래스에는 nn.TransformerEncoder와 nn.TransformerDecoder 클래스가 내장되어 있습니다. 각 클래스의 forward 메서드를 간단하게 살펴보면 다음과 같습니다.

 

nn.TransformerEncoder.forward(src, mask=None)

  • src : 인코더에 입력할 시퀀스

 

nn.TransformerDecoder.forward(tgt, memory, tgt_mask=None)

  • tgt: 디코더에 입력할 시퀀스
  • memory : 인코더의 마지막 층의 시퀀스

이를 사용해 Seq2SeqTransformer 클래스는 encode와 decode 메서드를 구현하여 이후 번역할 때 사용합니다.

 

 

4. 트랜스포머 모델 구조

from torch import optim


BATCH_SIZE = 128
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

model = Seq2SeqTransformer(
    num_encoder_layers=3,
    num_decoder_layers=3,
    emb_size=512,
    max_len=512,
    nhead=8,
    src_vocab_size=len(vocab_transform[SRC_LANGUAGE]),
    tgt_vocab_size=len(vocab_transform[TGT_LANGUAGE]),
    dim_feedforward=512,
).to(DEVICE)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX).to(DEVICE)
optimizer = optim.Adam(model.parameters())

for main_name, main_module in model.named_children():
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("│  └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children():
                print("│  │  └", sssub_name)
src_tok_emb
└ embedding
tgt_tok_emb
└ embedding
positional_encoding
└ dropout
transformer
└ encoder
│  └ layers
│  │  └ 0
│  │  └ 1
│  │  └ 2
│  └ norm
└ decoder
│  └ layers
│  │  └ 0
│  │  └ 1
│  │  └ 2
│  └ norm
generator

 

 

5. 배치 데이터 생성

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

def sequential_transforms(*transforms):
  def func(txt_input):
    for transform in transforms:
      txt_input = transform(txt_input)
    return txt_input
  return func

def input_transform(token_ids):
  return torch.cat(
      (torch.tensor([BOS_IDX]), torch.tensor(token_ids), torch.tensor([EOS_IDX]))
  )

def collator(batch):
  src_batch, tgt_batch = [], []
  for src_sample, tgt_sample in batch:
    src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
    tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))
  
  src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
  tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
  return src_batch, tgt_batch

text_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
  text_transform[language] = sequential_transforms(
      token_transform[language], # Tokenization
      vocab_transform[language], # Numericalization
      input_transform # Add BOS/EOS and create tensor
  )

data_iter = Multi30k(split="valid", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)

 

DataLoader의 collate_fn은 각각의 미니배치를 구성하는 과정에서 호출되는 함수로, 사용자가 정의한 방식대로 데이터를 처리합니다.

 

 

6. 모델 학습 및 평가

def run(model, optimizer, criterion, split):
  model.train() if split == "train" else model.eval()
  data_iter = Multi30k(split=split, language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
  dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)

  losses = 0
  dataloader_len = 0
  for source_batch, target_batch in dataloader:
    dataloader_len += 1
    source_batch = source_batch.to(DEVICE)
    target_batch = target_batch.to(DEVICE)

    target_input = target_batch[:-1, :]
    target_output = target_batch[1:, :]

    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
        source_batch, target_input
    )

    logits = model(
        src=source_batch,
        tgt=target_input,
        src_mask=src_mask,
        tgt_mask=tgt_mask,
        src_padding_mask=src_padding_mask,
        tgt_padding_mask=tgt_padding_mask,
        memory_key_padding_mask=src_padding_mask,
    )

    optimizer.zero_grad()
    loss = criterion(logits.reshape(-1, logits.shape[-1]), target_output.reshape(-1))
    if split == "train":
      loss.backward()
      optimizer.step()
    losses += loss.item()

  return losses / dataloader_len

for epoch in range(5):
  train_loss = run(model, optimizer, criterion, "train")
  valid_loss = run(model, optimizer, criterion, "valid")
  print(f"Epoch: {epoch+1}, Train loss: {train_loss:.3f}, Valid loss: {valid_loss:.3f}")
Epoch: 1, Train loss: 3.697, Valid loss: 3.609
Epoch: 2, Train loss: 3.442, Valid loss: 3.515
Epoch: 3, Train loss: 3.290, Valid loss: 3.491
Epoch: 4, Train loss: 3.170, Valid loss: 3.461
Epoch: 5, Train loss: 3.098, Valid loss: 3.502

 

 

트랜스포머 모델 번역 결과

def greedy_decode(model, source_tensor, source_mask, max_len, start_symbol):
  source_tensor = source_tensor.to(DEVICE)
  source_mask = source_mask.to(DEVICE)

  memory = model.encode(source_tensor, source_mask)
  ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
  for i in range(max_len - 1):
    memory = memory.to(DEVICE)
    target_mask = generate_square_subsequent_mask(ys.size(0)).type(torch.bool).to(DEVICE)
    
    out = model.decode(ys, memory, target_mask) # [i + 1, 1, emb_size]
    out = out.transpose(0, 1) # [1, i + 1, emb_size]
    prob = model.generator(out[:, -1])
    _, next_word = torch.max(prob, dim=1)
    next_word = next_word.item()

    ys = torch.cat(
        [ys, torch.ones(1, 1).type_as(source_tensor.data).fill_(next_word)], dim=0
    )
    if next_word == EOS_IDX:
      break
  return ys

def translate(model, source_sentence):
  model.eval()
  source_tensor = text_transform[SRC_LANGUAGE](source_sentence).view(-1, 1)
  num_tokens = source_tensor.shape[0]
  source_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
  tgt_tokens = greedy_decode(
      model, source_tensor, source_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX
  ).flatten()
  output = vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))[1:-1]
  return " ".join(output)
output_oov = translate(model, "Eine Gruppe von Menschen steht vor einem Iglu .")
output = translate(model, "Eine Gruppe von Menschen steht vor einem Gebäude .")
print(output_oov)
print(output)
A group of people are playing a game .
A group of people are playing a game .

 

구글 번역 결과는 다음과 같습니다.

seed 고정을 하지 않고 학습하여서 그런지 책과 비교하여 모델이 다르게 학습된 부분도 있고, 그리디 디코딩의 단점을 보여주기도 합니다.