본문 바로가기

책/파이토치 트랜스포머를 활용한 자연어 처리와 컴퓨터비전 심층학습

06 임베딩 (2) RNN

RNN, LSTM의 이론적인 내용은 이전에 다룬적이 있기 때문에 여기서는 파이토치에서 RNN과 LSTM을 사용하는 방법만 다룬다.

 

2023.09.26 - [책/밑바닥부터 시작하는 딥러닝 2] - 5장 RNN (1)

2023.10.27 - [책/밑바닥부터 시작하는 딥러닝 2] - 6장 게이트가 추가된 RNN (1)

 

순환 신경망(Recurrent Neural Network, RNN)

 

RNN 클래스

rnn = torch.nn.RNN(
    input_size,
    hidden_size,
    num_layers=1,
    nonlinerity='tanh',
    bias=False,
    batch_first=True,
    dropout=0,
    bidirectional=False,
)
  • num_layers : 순환 신경망의 층수를 의미, 2 이상이면 다중 순환 신경망을 구성
  • nonlinearity : 순황 신경망에서 사용되는 활성화 함수를 결정, tanh와 relu 적용 가능
  • batch_first : 입력 배치 크기를 첫 번째 차원으로 사용할지 여부를 결정

양방향 다중 순환 신경망

import torch
from torch import nn

input_size = 128
output_size = 256
num_layers = 3
bidirectional = True

model = nn.RNN(
    input_size=input_size,
    hidden_size=output_size,
    num_layers=num_layers,
    nonlinearity='tanh',
    bidirectional=bidirectional,
    batch_first=True
)

batch_size = 4
sequence_len = 6

inputs = torch.randn(batch_size, sequence_len, input_size)
h_0 = torch.rand(num_layers * (int(bidirectional) + 1), batch_size, output_size) # 초기 은닉 상태

outputs, hidden = model(inputs, h_0)

print(outputs.shape)
print(hidden.shape)
torch.Size([4, 6, 512])
torch.Size([6, 4, 256])
  • 입력 차원 : [배치 크기, 시퀀스 길이, 입력 특성 크기]
  • 초기 은닉 상태 : [계층 수 * 양방향 여부 + 1, 배치 크기, 은닉 상태 크기]
  • 출력 차원 : [배치 크기, 시퀀스 길이, (양방향 여부 + 1) * 은닉 상태 크기]
  • 최종 은닉 상태 : 초기 은닉 상태와 동일

LSTM

 

LSTM 클래스

lstm = torch.nn.LSTM(
    input_size,
    hidden_size,
    num_layers=1,
    bias=False,
    batch_first=True,
    dropout=0,
    bidirectional=False,
    proj_size=0
)
  • RNN에서 사용하는 nonlinearity 매개변수를 사용하지 않는다.
  • 투사 크기(proj_size)는 hidden state의 출력 크기를 제어하는 매개변수이다. proj_size가 0이라면 프로젝션 없이 기본적인 hidden state를 출력하고 0보다 큰 정수로 설정되면 hidden state를 해당 크기로 프로젝션한다.

양방향 다중 LSTM

input_size = 128
ouput_size = 256
num_layers = 3
bidirectional = True
proj_size = 64

model = nn.LSTM(
    input_size=input_size,
    hidden_size=ouput_size,
    num_layers=num_layers,
    batch_first=True,
    bidirectional=bidirectional,
    proj_size=proj_size,
)

batch_size = 4
sequence_len = 6

inputs = torch.randn(batch_size, sequence_len, input_size)
h_0 = torch.rand(
    num_layers * (int(bidirectional) + 1),
    batch_size,
    proj_size if proj_size > 0 else ouput_size,
)
c_0 = torch.rand(num_layers * (int(bidirectional) + 1), batch_size, ouput_size)

outputs, (h_n, c_n) = model(inputs, (h_0, c_0))

print(outputs.shape)
print(h_n.shape)
print(c_n.shape)
torch.Size([4, 6, 128])
torch.Size([6, 4, 64])
torch.Size([6, 4, 256])
  • proj_size를 64로 설정하여 output의 경우 마지막 차원 값이 (양방향 여부 + 1) * proj_size가 되고 hidden state의 마지막 차원은 proj_size와 동일하다.

모델 실습

RNN과 LSTM을 활용해 문장 긍/부정 분류 모델을 학습해보자

 

문장 분류 모델

class SentenceClassifier(nn.Module):
  def __init__(self, n_vocab, hidden_dim, embedding_dim,
               n_layers, dropout=0.5, bidirectional=True, model_type='lstm'):
    super().__init__()

    self.embedding = nn.Embedding(
        num_embeddings=n_vocab,
        embedding_dim=embedding_dim,
        padding_idx=0
    )
    if model_type == "rnn":
      self.model = nn.RNN(
          input_size=embedding_dim,
          hidden_size=hidden_dim,
          num_layers=n_layers,
          bidirectional=bidirectional,
          dropout=dropout,
          batch_first=True,
      )
    elif model_type == "lstm":
      self.model = nn.LSTM(
          input_size=embedding_dim,
          hidden_size=hidden_dim,
          num_layers=n_layers,
          bidirectional=bidirectional,
          dropout=dropout,
          batch_first=True,
      )

    if bidirectional:
      self.classifier = nn.Linear(hidden_dim * 2, 1)
    else:
      self.classifier = nn.Linear(hidden_dim, 1)
    self.dropout = nn.Dropout(dropout)

  def forward(self, inputs):
    embeddings = self.embedding(inputs)
    output, _ = self.model(embeddings)
    last_output = output[:, -1, :]
    last_output = self.dropout(last_output)
    logits = self.classifier(last_output)
    return logits
  • forward 메서드에서 output의 마지막 시점만 활용할 예정이므로 [:, -1, :]으로 마지막 시점의 결괏값만 분리해 분류기 계층에 전달한다.

데이터 세트 불러오기

import pandas as pd
from Korpora import Korpora


corpus = Korpora.load("nsmc")
corpus_df = pd.DataFrame(corpus.test)

train = corpus_df.sample(frac=0.9, random_state=42)
test = corpus_df.drop(train.index)

print(train.head(5).to_markdown())
print("Training Data Size :", len(train))
print("Testing Data Size :", len(test))
|       | text                                                                                     |   label |
|------:|:-----------------------------------------------------------------------------------------|--------:|
| 33553 | 모든 편견을 날려 버리는 가슴 따뜻한 영화. 로버트 드 니로, 필립 세이모어 호프만 영원하라. |       1 |
|  9427 | 무한 리메이크의 소재. 감독의 역량은 항상 그 자리에...                                    |       0 |
|   199 | 신날 것 없는 애니.                                                                       |       0 |
| 12447 | 잔잔 격동                                                                                |       1 |
| 39489 | 오랜만에 찾은 주말의 명화의 보석                                                         |       1 |
Training Data Size : 45000
Testing Data Size : 5000

 

데이터 토큰화 및 단어 사전 구축

from konlpy.tag import Okt
from collections import Counter


def build_vocab(corpus, n_vocab, special_tokens):
    counter = Counter()
    for tokens in corpus:
        counter.update(tokens)
    vocab = special_tokens
    for token, count in counter.most_common(n_vocab):
        vocab.append(token)
    return vocab


tokenizer = Okt()
train_tokens = [tokenizer.morphs(review) for review in train.text]
test_tokens = [tokenizer.morphs(review) for review in test.text]

vocab = build_vocab(corpus=train_tokens, n_vocab=5000, special_tokens=["<pad>", "<unk>"])
token_to_id = {token: idx for idx, token in enumerate(vocab)}
id_to_token = {idx: token for idx, token in enumerate(vocab)}

print(vocab[:10])
print(len(vocab))
['<pad>', '<unk>', '.', '이', '영화', '의', '..', '가', '에', '...']
5002

 

정수 인코딩 및 패딩

import numpy as np

def pad_sequences(sequences, max_length, pad_value):
  result = list()
  for sequence in sequences:
    sequence = sequence[:max_length]
    pad_length = max_length - len(sequence)
    padded_sequence = sequence + [pad_value] * pad_length
    result.append(padded_sequence)
  return np.asarray(result)

unk_id = token_to_id["<unk>"]
train_ids = [
    [token_to_id.get(token, unk_id) for token in review] for review in train_tokens
]
test_ids = [
    [token_to_id.get(token, unk_id) for token in review] for review in test_tokens
]

max_length = 32
pad_id = token_to_id["<pad>"]
train_ids = pad_sequences(train_ids, max_length, pad_id)
test_ids = pad_sequences(test_ids, max_length, pad_id)

print(train_ids[0])
print(test_ids[0])
[ 223 1716   10 4036 2095  193  755    4    2 2330 1031  220   26   13
 4839    1    1    1    2    0    0    0    0    0    0    0    0    0
    0    0    0    0]
[3307    5 1997  456    8    1 1013 3906    5    1    1   13  223   51
    3    1 4684    6    0    0    0    0    0    0    0    0    0    0
    0    0    0    0]

 

데이터 로더 적용

import torch
from torch.utils.data import TensorDataset, DataLoader


train_ids = torch.tensor(train_ids)
test_ids = torch.tensor(test_ids)

train_labels = torch.tensor(train.label.values, dtype=torch.float32)
test_labels = torch.tensor(test.label.values, dtype=torch.float32)

train_dataset = TensorDataset(train_ids, train_labels)
test_dataset = TensorDataset(test_ids, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

 

손실 함수와 최적화 함수 정의

from torch import optim


n_vocab = len(token_to_id)
hidden_dim = 64
embedding_dim = 128
n_layers = 2

device = "cuda" if torch.cuda.is_available() else "cpu"
classifier = SentenceClassifier(
    n_vocab=n_vocab, hidden_dim=hidden_dim, embedding_dim=embedding_dim, n_layers=n_layers
).to(device)
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = optim.RMSprop(classifier.parameters(), lr=0.001)
  • BCEWithLogitsLoss는 파이토치에서 제공하는 이진 교차 엔트로피 손실(Binary Cross Entropy Loss)을 계산하는 함수 중 하나이다.
  • BCEWithLogitsLoss는 주로 신경망의 출력값에 로짓(logits)을 사용하는 경우에 적용된다. 로짓은 확률을 나타내는 값으로 변환되기 전의 네트워크의 출력값이다.

모델 학습 및 테스트

def train(model, datasets, criterion, optimizer, device, interval):
    model.train()
    losses = list()

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step % interval == 0:
            print(f"Train Loss {step} : {np.mean(losses)}")


def test(model, datasets, criterion, device):
    model.eval()
    losses = list()
    corrects = list()

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())
        yhat = torch.sigmoid(logits)>.5
        corrects.extend(
            torch.eq(yhat, labels).cpu().tolist()
        )

    print(f"Val Loss : {np.mean(losses)}, Val Accuracy : {np.mean(corrects)}")


epochs = 5
interval = 500

for epoch in range(epochs):
    train(classifier, train_loader, criterion, optimizer, device, interval)
    test(classifier, test_loader, criterion, device)
Train Loss 0 : 0.6867725849151611
Train Loss 500 : 0.6936247455859612
Train Loss 1000 : 0.6907391950086161
Train Loss 1500 : 0.682171862058684
Train Loss 2000 : 0.6720567100021853
Train Loss 2500 : 0.6646402294828337
Val Loss : 0.6168034291876772, Val Accuracy : 0.6782
Train Loss 0 : 0.5940654873847961
Train Loss 500 : 0.5844168995549817
Train Loss 1000 : 0.5647882052830288
Train Loss 1500 : 0.5538767893103105
Train Loss 2000 : 0.5439690747882413
Train Loss 2500 : 0.5374484636970922
Val Loss : 0.5012446052540606, Val Accuracy : 0.7622
Train Loss 0 : 0.554517388343811
Train Loss 500 : 0.5009422014632863
Train Loss 1000 : 0.4862860381513923
Train Loss 1500 : 0.4737898110847009
Train Loss 2000 : 0.46813520336377507
Train Loss 2500 : 0.4621158795046215
Val Loss : 0.44711719243861614, Val Accuracy : 0.8004
Train Loss 0 : 0.46445703506469727
Train Loss 500 : 0.410575522753055
Train Loss 1000 : 0.40422590316592394
Train Loss 1500 : 0.39952670239274457
Train Loss 2000 : 0.3960885977794205
Train Loss 2500 : 0.39488419776604394
Val Loss : 0.4035346594195777, Val Accuracy : 0.8172
Train Loss 0 : 0.40387678146362305
Train Loss 500 : 0.3576682861187977
Train Loss 1000 : 0.3543693164249996
Train Loss 1500 : 0.35621099595940964
Train Loss 2000 : 0.35406598514658044
Train Loss 2500 : 0.35390355664103185
Val Loss : 0.3949069145578927, Val Accuracy : 0.8198

 

학습된 모델로부터 임베딩 추출

token_to_embedding = dict()
embedding_matrix = classifier.embedding.weight.detach().cpu().numpy()

for word, emb in zip(vocab, embedding_matrix):
  token_to_embedding[word] = emb

token = vocab[1000]
print(token, token_to_embedding[token])
보고싶다 [ 0.9156976  -0.55401576  1.8168696   1.2887877   0.02241418 -0.61677414
  0.09421384  1.2348244  -1.3410246   0.09005291  0.83459055  1.5599653
  0.09008738 -0.7722967  -1.0689512   0.40242195  0.5993975  -1.0171436
  0.76490223  1.1648107  -0.10444339  0.18582134  0.48415342  0.47307476
 -0.07117919  0.50694156 -0.02449991  0.8860589  -2.1261668   0.37267584
  1.395763   -0.22399338 -0.98970884  2.0817513   0.49492034  0.5672728
  0.80570793 -0.51452816 -0.2423634  -0.5295623  -2.767572   -0.60893476
 -0.50125694  1.2644466  -1.325586   -0.35699975  0.77019966  0.27758837
  1.6750653   0.88076276 -2.6860402   3.0295355   0.21739434  1.2575622
 -0.32314125 -0.18484998  0.9903218  -0.9300321  -0.75998497 -1.3938714
 -1.0250475  -0.4878564   1.892235    2.6940422  -0.02856987  0.06278264
  0.77608645 -0.6453776   0.01995367 -0.60925746 -0.9569977  -0.6779056
  1.0350417   0.9626615  -0.7517421   0.19573943 -1.3109112  -0.42941976
  0.5551294   1.6199611  -0.49342176  0.94042236  0.34073305  0.17974217
 -0.6445961   1.9881126  -1.9267832   0.48872843  0.8920549   0.5017308
 -1.432101    0.5430365  -1.6260115  -0.02909879  1.4827608   0.7210093
 -0.959514    1.3264253  -0.04167606 -0.32375333 -0.3421971  -0.5973011
 -1.0834634   0.19038841  0.06404561  0.5784112   1.4253327  -0.60482126
 -1.2246796   1.2418135  -1.0895737   0.6756441  -0.97556746 -0.52009875
 -0.95636415  0.41835323  0.6283442   0.5424502  -1.3228528  -0.11871856
 -0.3197381  -0.48346856 -2.1461508   0.8526704  -1.9427136   2.429411
 -1.5519657  -0.44550398]

 

다음은 사전 학습된 모델로 임베딩 계층을 초기화하는 방법을 보여준다.

from gensim.models import Word2Vec

tokens = [tokenizer.morphs(review) for review in corpus_df.text]

word2vec = Word2Vec(
    sentences=tokens,
    vector_size=128,
    window=5,
    min_count=1,
    sg=1,
    epochs=3,
    max_final_vocab=10000
)
init_embedding = np.zeros((n_vocab, embedding_dim))

for index, token in id_to_token.items():
  if token not in ['<pad>', '<unk>']:
    init_embedding[index] = word2vec.wv[token]

embedding_layer = nn.Embedding.from_pretrained(
    torch.tensor(init_embedding, dtype=torch.float32)
)
class SentenceClassifier(nn.Module):
    def __init__(
        self,
        n_vocab,
        hidden_dim,
        embedding_dim,
        n_layers,
        dropout=0.5,
        bidirectional=True,
        model_type="lstm",
        pretrained_embedding=None
    ):
        super().__init__()
        if pretrained_embedding is not None:
            self.embedding = nn.Embedding.from_pretrained(
                torch.tensor(pretrained_embedding, dtype=torch.float32)
            )
        else:
            self.embedding = nn.Embedding(
                num_embeddings=n_vocab,
                embedding_dim=embedding_dim,
                padding_idx=0
            )

        if model_type == "rnn":
            self.model = nn.RNN(
                input_size=embedding_dim,
                hidden_size=hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout,
                batch_first=True,
            )
        elif model_type == "lstm":
            self.model = nn.LSTM(
                input_size=embedding_dim,
                hidden_size=hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout,
                batch_first=True,
            )

        if bidirectional:
            self.classifier = nn.Linear(hidden_dim * 2, 1)
        else:
            self.classifier = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        output, _ = self.model(embeddings)
        last_output = output[:, -1, :]
        last_output = self.dropout(last_output)
        logits = self.classifier(last_output)
        return logits
  • 임베딩 클래스의 from_pretrained 메서드로 초기 임베딩 값을 초기화할 수 있다.