Deep Learning/Transformer

[Transformer] Transformer 코드 리뷰

MongTae 2025. 2. 14. 17:38

 

Positional Encoding

class PositionalEncoding(nn.Module):
    """
    Transformer는 입력 토큰의 순서를 고려하기 위해 위치 정보를 부가한다.
    이를 위해 논문에서는 사인/코사인 함수를 이용한 Positional Encoding을 사용.
    PE(pos, 2i)   = sin( pos / (10000^(2i/d_model)) )
    PE(pos, 2i+1) = cos( pos / (10000^(2i/d_model)) )
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        # pos : 0부터 max_len까지
        # i : 0부터 d_model까지
        pe = torch.zeros(max_len, d_model)  # (mex_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # (d_model / 2)

        pe[:, 0::2] = torch.sin(position * div_term)  # 짝수 채널
        pe[:, 1::2] = torch.cos(position * div_term)  # 홀수 채널

        # (1, max_len, d_model)의 형태로 만들어 register_buffer에 저장
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x : (batch_size, seq_len, d_model)
        학습 파라미터로 삼지 않고, 단순히 더해만 주면 됨.
        """
        seq_len = x.size(1)
        # pe[:, :seq_len] => (1, seq_len, d_model)
        x = x + self.pe[:, :seq_len]
        return x

 

매개변수

  • d_model: 토큰 임베딩의 차원. (예를 들면 512, 768 등)
  • max_len: 최대 시퀀스 길이. 기본값은 5000.

pe = torch.zeros(max_len, d_model)

PE(Positional Encoding) 텐서 생성

 

  • pe(positional encoding)는 최대 길이 max_len와 임베딩 차원 d_model을 가지는 2차원 텐서를 0으로 초기화.
  • 여기서 각 행은 한 위치(pos, position)를, 각 열은 임베딩 차원의 특정 인덱스(index)를 의미.

position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

각 Position(위치) 값과 주파수 인자 준비

position 텐서

  • torch.arange(0, max_len)는 0부터 max_len - 1까지 숫자를 만든다.
  • unsqueeze(1)를 통해 shape이 (max_len, 1)이 되는데, 이후 곱셈 시 브로드캐스팅(broadcasting)이 잘 이루어지게 하기 위함.
  • 각 위치를 나타내는 값이 됨.

div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

 

div_term 텐서

  • torch.arange(0, d_model, 2)는 0부터 d_model까지 2씩 건너뛰면서 생성해서, 짝수 인덱스만 선택함.
  • 각 짝수 인덱스에 대해 -math.log(10000.0) / d_model을 곱하고, torch.exp를 씌워서 지수적인 감소값을 얻음.
  • 결과적으로, 각 임베딩 차원마다 다른 주파수를 부여하기 위한 스케일 팩터가 됨.
  • 이 값이 아래 수식에 해당하는 역할을 하게 됨.

 

pe[:, 0::2] = torch.sin(position * div_term) 
pe[:, 1::2] = torch.cos(position * div_term) 

PE(Positional Encoding) 텐서에 sin/cos 값을 채우기

  • pe[:, 0::2]: 모든 행에 대해 짝수 인덱스 열들을 선택.
    • 여기에 torch.sin(position * div_term)를 적용하여, 각 위치에 대해 사인 함수를 계산.
  • pe[:, 1::2]: 홀수 인덱스 열들을 선택.
    • 여기에 torch.cos(position * div_term)를 적용.
  • 이 방식은 위치 간의 상대적 관계를 잘 반영할 수 있도록 해줌.
  • 사인과 코사인을 번갈아 사용함으로써, 모델이 각 위치의 패턴을 학습할 때 주파수 성분들을 활용할 수 있게 됨.

pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)

register buffer와 텐서 차원 변경

 

  • unsqueeze(0)의 역할:
    • 원래 pe 텐서는 shape가 (max_len, d_model)임.
    • 여기에 unsqueeze(0)을 적용하면 새로운 차원이 0번 위치(제일 앞)에 추가되어, shape가 (1, max_len, d_model)이 됨.
  • 왜 새로운 차원이 필요함?
    • Transformer의 입력 텐서 x는 보통 shape가 (batch_size, seq_len, d_model)임.
    • 이때, 모든 배치에 동일한 위치 인코딩 값을 더해주고 싶으므로, pe의 shape를 (1, max_len, d_model)으로 만들어두면,
      • 첫 번째 차원(1)은 배치 차원으로 자동 브로드캐스팅되어, x의 배치 크기와 맞춰져 각 배치마다 동일한 위치 인코딩이 더해지게 됨.
  • 브로드캐스팅하기 편해진다는 말의 의미:
    • 예를 들어,
      • pe가 (1, seq_len, d_model)이고,
      • x가 (batch_size, seq_len, d_model)이면,
        PyTorch는 첫 번째 차원(크기 1)을 자동으로 복제해서 (batch_size, seq_len, d_model)로 맞춰주기 때문에,
        명시적으로 반복문 돌리거나 별도의 차원 확장을 하지 않아도 두 텐서를 더할 수 있게 되는 것.
  • register_buffer:
    • self.register_buffer('pe', pe)를 사용하면 pe가 모델의 상수 텐서(학습 파라미터가 아님)로 등록된다.
    • 모델 저장 시 같이 저장되고, GPU로 옮길 때도 함께 옮겨지지만, gradient 계산에는 포함되지 않음.
    • 즉, 위치 인코딩은 학습되지 않고 고정된 값으로 사용됨.
    • 학습 대상이 아닌 상수 텐서를 모델에 등록함으로써, 저장 및 GPU 이동 시 자동으로 포함되게 함.

 

forward 메서드

  • 입력 x
    • x는 Transformer의 입력 임베딩 텐서로, shape가 (batch_size, seq_len, d_model)임.
  • 순서
    • seq_len = x.size(1)로 실제 시퀀스 길이를 구함.
    • self.pe[:, :seq_len]로 생성된 위치 인코딩 중 필요한 부분만 잘라내서, shape은 (1, seq_len, d_model)가 됨.
    • 이를 입력 x에 요소별 덧셈(broadcasting 활용)으로 더해준다.
    • 이렇게 하면, 각 토큰 임베딩에 그 토큰의 위치 정보를 담은 벡터가 더해져 최종 임베딩이 됨.
    • 중요:
      • 위치 인코딩은 학습되는 파라미터가 아니라, 고정된 값이므로 그냥 더해주는 방식으로 사용.

 

 

 

 


Scaled Dot Product Attention

class ScaledDotProductAttention(nn.Module):
    """
    Scaled Dot-Product Attention
    1) Q * K^T를 수행 후, sqrt(d_k)로 나눈다.
    2) 마스크를 적용(Optional)
    3) 소프트맥스 -> V 곱연산
    """
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, mask=None):
        # Q, K, V shape:
        #   Q : (batch_size, n_heads, seq_len_q, d_k)
        #   K : (batch_size, n_heads, seq_len_k, d_k)
        #   V : (batch_size, n_heads, seq_len_k, d_v)
        
        d_k = K.size(-1)  # 보통 d_k = d_model / n_heads
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        # scores : (batch_size, n_heads, seq_len_q, seq_len_k)

        if mask is not None:
            # mask가 True인 부분을 매우 작은 값으로 채워 softmax 결과를 0이 되도록 만든다.
            scores = scores.masked_fill(mask == 0, -1e9)

        attn = torch.softmax(scores, dim=-1)
        # attn : (batch_size, n_heads, seq_len_q, seq_len_k)

        output = torch.matmul(attn, V)
        # output : (batch_size, n_heads, seq_len_q, d_v)

        return output, attn

 

d_k = K.size(-1)

Key 행렬의 마지막 차원(특징 차원)을 구함. 왜 마지막 차원을 구하는거임?

  • 일반적으로 Key (K) 행렬은 [batch_size, seq_len, d_k] 같은 형태를 가지는데, 여기서 d_k는 임베딩(특징) 차원임.
  • Scaled Dot-Product Attention에서는 내적(dot product) 결과가 너무 커지는 걸 방지하기 위해, sqrt(d_k)로 나눠줌.
  • 따라서, 마지막 차원에 있는 값이 바로 Key의 특징 수이므로 이 값을 구하는 것.

K.transpose(-2, -1)

-2와 -1의 의미는 뭐야?

  • PyTorch에서 음수 인덱스는 뒤에서부터 센다는 의미.
    • -1: 마지막 차원
    • -2: 마지막에서 두 번째 차원
  • 예를 들어, K의 shape가 [batch_size, seq_len, d_k]라면,
    • -1은 d_k 차원,
    • -2는 seq_len 차원.
  • transpose(-2, -1)마지막 두 차원(즉, seq_len과 d_k)의 순서를 바꾼다는 뜻.
  • 이렇게 해야 Q와 K를 내적할 때, 차원 맞춰서 곱셈이 가능해짐.

scores = scores.masked_fill(mask == 0, -1e9)

 

  • masking은 어떤 토큰(혹은 위치)을 무시할 것인지를 지정하는 텐서임.
    • 예를 들어, 문장에 패딩(padding)이 들어갔을 때, 그 패딩 토큰은 계산에서 제외하고 싶을 때 사용함.
  • 보통 1은 "유효함", 0은 "무효(무시)"를 나타냄.
    • 0인 위치: Attention 계산 시 참여하지 않아야 할(즉, 무시할) 부분.
  • 코드에서는 mask == 0인 곳을 -1e9로 채워 넣어,
    • Softmax 계산 시 이 값들이 거의 0 확률이 되도록 만드는 것.

attn = torch.softmax(scores, dim=-1)

  • 텐서의 shape가 [batch_size, num_heads, seq_len_q, seq_len_k]라면,
    • 보통은 dim=-1 (혹은 dim=3)을 사용해서, 각 query에 대해 모든 key의 점수들이 softmax로 정규화되도록 함.

 

 

 

 

 


 

Multi Head Attention

class MultiHeadAttention(nn.Module):
    """
    Multi-Head Attention
    1) 입력을 W_Q, W_K, W_V로 각각 선형 변환
    2) Scaled Dot-Product Attention 수행
    3) 여러 헤드를 이어서 최종 선형 변환
    """
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.d_v = d_model // n_heads

        # W_Q, W_K, W_V
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)

        # 출력 투영
        self.fc = nn.Linear(d_model, d_model)

        self.attention = ScaledDotProductAttention()

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # 1) Q, K, V 각각을 선형 변환
        # 이후 (batch_size, seq_len, d_model) -> (batch_size, seq_len, n_heads, d_k) 형태로 변환
        # 그리고 헤드 차원(n_heads)을 앞으로 가져옴
        q = self.W_Q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        k = self.W_K(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        v = self.W_V(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)

        # 2) Scaled Dot-Product Attention
        if mask is not None:
            # mask shape를 (batch_size, 1, seq_len_q, seq_len_k) -> (batch_size, n_heads, seq_len_q, seq_len_k)로 확장
            mask = mask.unsqueeze(1)

        out, attn = self.attention(q, k, v, mask=mask)
        # out: (batch_size, n_heads, seq_len, d_v)

        # 3) 여러 헤드를 다시 concat -> (batch_size, seq_len, n_heads * d_v)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # 4) 최종 선형 변환
        out = self.fc(out)

        return out, attn

 

 

클래스 정의 및 초기화 (__init__)

 

  • d_model: 전체 임베딩 차원. 단어(토큰)를 512차원의 벡터로 표현할 때, d_model은 512임.
  • n_heads: 몇 개의 병렬 attention을 할 것인가. 예를 들어 8개의 헤드를 사용한다면, 한 번에 8가지 서로 다른 관점으로 정보를 뽑아냄.
  • d_k와 d_v: 전체 차원(d_model)을 헤드 수(n_heads)로 나눈 값으로, 각 헤드가 담당하는 차원 수. (예: 512/8 = 64)

입력 변환하는 층 만들기

  • nn.Linear(d_model, d_model):
    • 이건 간단한 선형 변환(즉, 행렬 곱셈)을 수행하는 층인데, 입력과 출력의 차원이 d_model으로 동일.
    • 이 과정을 통해 입력 데이터를 Q, K, V(질의, 키, 값)로 변환.
  • self.fc: 여러 헤드에서 나온 결과를 다시 한 번 결합(합치고) 후 선형 변환하는 역할을 함.
  • self.attention: 실제로 Scaled Dot-Product Attention을 계산하는 함수(또 다른 클래스나 함수).

 

순전파 함수 (forward)

실제 계산이 일어나는 부분.

 

  • Q, K, V
    • 입력 데이터로부터 나온 질의(Query), 키(Key), 값(Value) 벡터들.
    • 보통 이들의 모양(shape)은 (batch_size, seq_len, d_model)인데, 여기서 batch_size는 한 번에 처리하는 문장(또는 데이터)의 개수, seq_len은 문장의 길이(단어 수)를 의미.
  • batch_size = Q.size(0)는 배치의 크기를 가져오는 코드.

1) Q, K, V 각각을 선형 변환 후 여러 헤드로 나누기

 

  • self.W_Q(Q): 입력 Q에 선형 변환을 적용해서 d_model 차원의 새로운 Q 벡터를 얻음.
  • .view(batch_size, -1, self.n_heads, self.d_k):
    • 여기서 view 함수는 텐서의 모양을 바꾸는 함수.
    • 원래 Q의 모양이 (batch_size, seq_len, d_model)였다면, 이를 (batch_size, seq_len, n_heads, d_k)로 바꿔서,
      d_model을 여러 헤드(n_heads)로 나눈다는 의미.
    • -1은 그 자리에 자동으로 맞춰 넣어달라는 뜻(여기서는 seq_len이 됨).
  • .transpose(1, 2):
    • 이 함수는 텐서의 차원 순서를 바꾸는 함수.
    • (batch_size, seq_len, n_heads, d_k)에서 seq_len과 n_heads의 위치를 바꿔서
    • (batch_size, n_heads, seq_len, d_k)로 만듦.
    • 이렇게 하면, 이후에 각 헤드별로 따로 계산하기 편해짐.
  • k와 v도 같은 방식으로 변환됨.

 

 

2) Scaled Dot-Product Attention 수행

 

  • mask:
    • 특정 위치의 값을 무시하고 싶을 때 사용하는 마스크 텐서.
    • mask.unsqueeze(1)은 마스크에 새로운 차원을 추가해서, 각 헤드에 맞게 모양을 확장하는 작업.
  • self.attention(q, k, v, mask=mask):
    • 위에서 만든 q, k, v를 가지고 실제 Scaled Dot-Product Attention을 계산.
    • 이 함수는 각 헤드별로, q와 k의 내적(유사도 계산)을 하고, softmax를 통해 가중치를 만든 후, v와 곱해서 결과를 만듦.
    • 반환 값은 out (최종 결과)와 attn (어텐션 가중치, 나중에 분석에 사용 가능).

 

3) 여러 헤드의 결과를 다시 합치기 (Concatenation)

 

  • out.transpose(1, 2):
    • 현재 out의 모양은 (batch_size, n_heads, seq_len, d_v)인데, 여기서 n_heads와 seq_len의 순서를 바꿔서 (batch_size, seq_len, n_heads, d_v)로 만든다.
  • .contiguous():
    • 메모리 상에 연속된 형태로 만들어 주는 함수로, view 함수를 사용하기 전에 필요할 수 있음.
  • .view(batch_size, -1, self.d_model):
    • 여러 헤드(n_heads)로 나뉜 차원들을 하나로 합쳐서, 최종적으로 (batch_size, seq_len, d_model) 모양으로 만든다.
    • 즉, 여러 헤드에서 나온 결과들을 이어 붙여서 하나의 벡터로 만듦.

4) 최종 선형 변환

 

 

  • self.fc는 앞서 정의한 선형 변환 층.
  • 여러 헤드에서 합쳐진 결과에 대해 한 번 더 선형 변환을 하여 최종 결과를 만들어준다.
  • 이 과정은 모델이 최종적으로 필요한 출력 형태로 변환하는 역할.

5) 결과 반환

 

 

  • out: 최종 계산된 Multi-Head Attention의 출력 (보통 이후 레이어에 전달됨).
  • attn: 각 헤드에서 계산된 어텐션 가중치 (나중에 분석하거나 디버깅할 때 참고할 수 있음).

 


 

Position-wise Feed Forward Network

class PositionwiseFeedForward(nn.Module):
    """
    Position-wise FeedForward
    FFN(x) = max(0, xW1 + b1)W2 + b2
    """
    def __init__(self, d_model, d_ff):
        super(PositionalEncoding, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        return self.linear2(self.relu(self.linear1(x)))

 

 

  • 여기서 FFN의 수식은 입력 xx에 대해 첫 번째 선형 계층 (xW1 + b1)을 적용한 후, ReLU 활성화 함수를 통해 음수를 0으로 만들고, 다시 두 번째 선형 계층 (W2 + b2)을 적용하는 과정을 나타냄.
  • 이때, "position-wise"라는 표현은 시퀀스의 각 위치(예: 문장의 각 단어)에 동일한 피드포워드 네트워크가 독립적으로 적용된다는 의미임.

 

1) __init__ 메서드: 모듈의 초기화

 

  • 입력 인자:
    • d_model: 입력 및 출력의 차원임. Transformer에서는 보통 모델의 숨겨진 상태(hidden state) 차원을 의미함.
    • d_ff: 은닉 계층의 차원으로, 보통 d_model보다 큰 값을 사용함. (예: 4 × d_model). 첫 번째 선형 계층에서 차원을 확장한 뒤 두 번째 계층에서 다시 축소함.
  • super 호출:
    • super(PositionalEncoding, self).__init__()
      • 여기서는 super()를 통해 부모 클래스(nn.Module)의 초기화를 호출함.
  • 내부 계층 정의:
    • self.linear1 = nn.Linear(d_model, d_ff)
      • 입력의 각 위치에 대해 차원을 d_model에서 d_ff로 확장하는 선형 계층.
    • self.linear2 = nn.Linear(d_ff, d_model)
      • 확장된 차원을 다시 원래 차원(d_model)으로 축소하는 선형 계층.
    • self.relu = nn.ReLU()
      • 첫 번째 선형 계층의 출력에 적용할 ReLU 활성화 함수.

 

2) forward 메서드: 순전파 연산

 

  • 입력 텐서 x:
    • 모양(shape)은 (batch_size, seq_len, d_model)임.
    • 여기서 batch_size는 한 번에 처리하는 샘플의 수, seq_len은 시퀀스의 길이, d_model은 각 단어의 임베딩 차원임.
  • 연산 순서:
    1. 첫 번째 선형 변환:
      • self.linear1(x)
        • 각 위치의 d_model 차원 벡터를 d_ff 차원으로 선형 변환합니다.
    2. ReLU 활성화 적용:
      • self.relu(self.linear1(x))
        • 선형 변환 결과에 ReLU를 적용하여 음수를 0으로 만든다. 이는 모델에 비선형성을 부여하여 표현력을 높임.
    3. 두 번째 선형 변환:
      • self.linear2(...)
        • ReLU의 출력 벡터를 다시 d_modeld\_model 차원으로 축소함.
    4. 최종 출력:
      • 연산 결과는 입력과 동일한 모양 (batch_size, seq_len, d_model)을 가지게 됨.
  • Position-wise 처리:
    • 이 연산은 시퀀스의 각 위치에 대해 독립적으로 동일한 두 계층의 변환을 적용함. 즉, 시퀀스의 각 단어에 동일한 feed-forward 네트워크가 적용되어, 위치 간의 상호작용 없이 각 위치의 특징을 개별적으로 변환함.

 

 

 

  • Transformer 모델의 인코더와 디코더에는 self-attention 계층 다음에 이와 같은 피드포워드 네트워크가 위치함.
  • Self-attention은 서로 다른 위치 간의 상호작용을 처리하고, FFN은 각 위치의 표현을 개별적으로 더 풍부하게 만드는 역할을 함.

 

 

 


Encoder Layer

class EncoderLayer(nn.Module):
    """
    Encoder Layer:
    1) Self-Attention (Multi-Head)
    2) Layer Normalization + Residual Connection
    3) Feed Forward
    4) Layer Normalization + Residual Connection
    """
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # x: (batch_size, seq_len, d_model)
        # mask: (batch_size, seq_len) 형태 등
        
        # 1) Self-Attention
        attn_out, _ = self.self_attn(x, x, x, mask=mask)

        # 2) Layer Nrom & Residual
        x = self.norm1(x + self.dropout1(attn_out))

        # 3) Feed Forward
        ffn_out = self.ffn(x)

        # 4) Layer Norm & Residual
        x = self.norm2(x + self.dropout2(ffn_out))

        return x

 

 

  • Self-Attention (Multi-Head): 입력 내부의 모든 위치 간 상호작용을 학습함.
  • Layer Normalization + Residual Connection: self-attention 결과에 잔차 연결(residual connection)을 더한 후 정규화함.
  • Feed Forward: 각 위치별로 독립적인 position-wise feed-forward 네트워크를 적용함.
  • Layer Normalization + Residual Connection: feed-forward 결과에 다시 잔차 연결과 정규화를 수행함.

 

1) __init__ 메서드: 모듈 구성

 

  • 입력 인자:
    • d_model: 모델 히든 레이어의 차원. 입력 텐서의 마지막 차원 크기를 의미하며, self-attention과 feed-forward 네트워크의 입출력 차원으로 사용됨.
    • n_heads: multi-head attention에서 사용할 헤드의 개수.
    • d_ff: feed-forward 네트워크 내부의 은닉 차원. 일반적으로 d_model보다 크며, 두 선형 계층 사이의 차원 확장을 위해 사용됨.
    • dropout: 드롭아웃 확률로, 과적합(overfitting)을 방지하기 위해 사용됨.

내부 구성 요소:

  1. Multi-Head Self-Attention: 입력 텐서의 각 위치가 다른 모든 위치와 상호작용할 수 있도록 하는 self-attention 메커니즘을 여러 헤드를 통해 수행함.
  2. Position-wise Feed Forward Network: 앞서 설명한 것처럼, 각 위치별로 두 개의 선형 변환과 ReLU 활성화를 적용하는 네트워크.
  3. Layer Normalization: 각 서브레이어(self-attention과 feed-forward)의 출력에 대해 정규화(normalization)를 수행함. 정규화는 학습을 안정시키고 수렴 속도를 높이는 데 도움을 줌.
  4. Dropout: self-attention과 feed-forward의 결과에 드롭아웃을 적용하여, 모델이 특정 뉴런에 과도하게 의존하지 않도록 함.

 

2) forward 메서드: 순전파 연산

입력:

  • x: 모양이 (batch_size, seq_len, d_model)인 텐서.
    • batch_size: 한 번에 처리하는 샘플의 수.
    • seq_len: 시퀀스의 길이 (예를 들어, 문장의 단어 수).
    • d_model: 각 단어(또는 위치)의 임베딩 차원.
  • mask: 선택적 입력으로, attention 계산 시 특정 위치(예: 패딩 토큰)를 무시할 수 있도록 하는 마스크임.

 

단계별 연산 과정

  1. Self-Attention 수행

 

  • 연산:
    • self.self_attn는 multi-head self-attention 모듈입니다.
    • 세 개의 입력인자 (x, x, x)는 각각 Query, Key, Value로 사용.
    • mask는 attention 점수를 계산할 때, 특정 위치를 무시하도록 도와줌.
  • 출력:
    • attn_out: self-attention 계층의 출력 텐서 (모양: (batch_size, seq_len, d_model)).
    • 두 번째 반환값은 attention 가중치(weights)이지만, 여기서는 사용하지 않으므로 _로 무시.

 

2. 첫 번째 Residual Connection 및 Layer Normalization

  • Residual Connection:
    • 원래의 입력 x와 self-attention 결과 attn_out (드롭아웃 적용 후)을 더함.
    • 이 연결은 정보의 손실을 줄이고, 깊은 네트워크에서 기울기 소실 문제를 완화하는 역할.
  • Dropout:
    • self.dropout1(attn_out)를 통해 self-attention 출력에 드롭아웃을 적용.
  • Layer Normalization:
    • self.norm1은 더한 결과를 정규화하여 안정적인 학습을 도움.

 

3. Feed Forward Network 적용

 

  • 연산:
    • 업데이트된 x에 대해, position-wise feed-forward 네트워크를 적용.
    • 이 네트워크는 각 위치에 독립적으로 두 개의 선형 변환과 ReLU 활성화를 수행하여 추가적인 비선형 변환을 제공.
  • 출력:
    • ffn_out은 feed-forward 계층의 결과 텐서.

 

 

 

  • 최종적으로, Transformer 인코더의 한 층을 통과한 결과 x를 반환함.
  • 출력 텐서의 모양은 입력과 동일하게 (batch_size, seq_len, d_model).

 

 


Decoder Layer

class DecoderLayer(nn.Module):
    """
    Decoder Layer:
    1) Masked Self-Attention (Look-ahead mask)
    2) Layer Normalization + Residual
    3) Encoder-Decoder Attention
    4) Layer Normalization + Residual
    5) Feed Forward
    6) Layer Normalization + Residual
    """
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        # x: (batch_size, tgt_seq_len, d_model) - decoder 입력
        # enc_out: (batch_size, src_seq_len, d_model) - encoder 출력
        # tgt_mask: 디코더의 look-ahead mask (자기회귀)
        # memory_mask: 인코더-디코더 어텐션을 위한 mask

        # 1) Masked Self-Attention
        self_attn_out, _ = self.self_attn(x, x, x, mask=tgt_mask)
        x = self.norm1(x + self.dropout1(self_attn_out))

        # 2) Encoder-Decoder Attention
        # Q = 디코더의 현재 상태, K, V = 인코더 출력
        enc_dec_attn_out, _ = self.enc_dec_attn(x, enc_out, enc_out, mask=memory_mask)
        x = self.norm2(x + self.dropout2(enc_dec_attn_out))

        # 3) Feed Forward
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout3(ffn_out))

        return x

 

 

  • 디코더의 한 층은 다음의 세 가지 주요 서브레이어로 구성됨.
    1. Masked Self-Attention: 디코더 입력에 대해 자기자신 내에서 attention을 계산함. 이때 look-ahead mask를 적용하여 미래의 정보를 참조하지 못하도록 함.
    2. Encoder-Decoder Attention: 디코더가 인코더의 출력을 참조할 수 있도록, 디코더의 현재 상태를 Query로, 인코더 출력을 Key와 Value로 하여 attention을 계산.
    3. Feed Forward Network (FFN): 각 위치별로 독립적인 두 개의 선형 변환(및 비선형 활성화)을 적용하여 표현을 더욱 풍부하게 만듦.
  • 각 서브레이어 이후에는 Residual ConnectionLayer Normalization이 적용되어 안정적인 학습을 돕고, 드롭아웃(Dropout)을 통해 과적합을 방지.

 

1) __init__ 메서드: 모듈 초기화 및 구성 요소 생성

  • 입력 인자:
    • d_model: 모델 히든 레이어의 차원으로, 디코더와 인코더의 출력 벡터의 크기를 의미.
    • n_heads: multi-head attention에서 사용할 헤드의 개수로, 여러 개의 attention 서브공간에서 정보를 추출.
    • d_ff: 피드포워드 네트워크 내부의 은닉 차원. 보통 d_model보다 큰 값으로 설정하여 표현력을 높임.
    • dropout: 드롭아웃 확률로, 과적합을 방지하기 위해 사용.
  • 내부 구성 요소:
    1. Masked Self-Attention 계층 (self.self_attn):
      • 디코더 입력 x에 대해 자기 자신끼리 attention을 계산.
      • Look-ahead mask를 적용하여, 현재 시점 이후의 단어(또는 토큰)에 대한 정보를 참조하지 못하게 함.
    2. Encoder-Decoder Attention 계층 (self.enc_dec_attn):
      • 디코더의 현재 상태 x(Query)와 인코더 출력 enc_out(Key, Value)을 이용해, 인코더로부터 필요한 문맥 정보를 가져옴.
    3. Position-wise Feed Forward Network (self.ffn):
      • 앞서 설명한 것처럼, 각 시퀀스 위치마다 독립적으로 두 개의 선형 변환과 비선형(ReLU) 활성화를 적용하여 추가 변환을 수행.
    4. Layer Normalization (self.norm1, self.norm2, self.norm3):
      • 각 서브레이어의 출력에 대해 정규화를 수행하여 학습의 안정성을 높임.
    5. Dropout Layers (self.dropout1, self.dropout2, self.dropout3):
      • 각 서브레이어의 결과에 드롭아웃을 적용하여 특정 뉴런에 대한 의존도를 낮추고 일반화를 도움.

2) forward 메서드: 순전파 연산

(1) Masked Self-Attention

  • Masked Self-Attention 연산:
    • self.self_attn(x, x, x, mask=tgt_mask)에서는 디코더의 입력 x를 Query, Key, Value 모두로 사용.
    • tgt_mask는 look-ahead mask로, 디코더가 미래의 토큰을 참조하지 못하게 합니다. 이를 통해 자기회귀(auto-regressive) 방식의 생성이 가능해짐.
  • Residual Connection 및 Dropout:
    • self-attention 결과 self_attn_out에 드롭아웃을 적용한 후 원래 입력 x와 더함.
  • Layer Normalization:
    • self.norm1을 통해 더한 결과를 정규화하여 안정적인 학습을 유도.

(2) Encoder-Decoder Attention

  • Encoder-Decoder Attention 연산:
    • 여기서 Query는 현재 디코더의 상태 x이고, Key와 Value는 인코더의 출력 enc_out임.
    • 이를 통해 디코더는 인코더가 추출한 입력 문장의 문맥 정보를 참고.
    • memory_mask는 인코더-디코더 어텐션에서 특정 위치를 무시할 필요가 있을 때 사용.
  • Residual Connection, Dropout 및 Layer Normalization:
    • 인코더-디코더 attention 결과에 드롭아웃을 적용한 후 이전의 x와 더하고, self.norm2를 통해 정규화.

(3) Feed Forward Network

  • Feed Forward 연산:
    • 업데이트된 x를 position-wise feed forward 네트워크 (self.ffn)에 통과.
    • 이 네트워크는 각 시퀀스의 위치에 대해 두 개의 선형 변환과 비선형 활성화(ReLU)를 적용하여 추가적인 표현 변환을 수행.
  • Residual Connection, Dropout 및 Layer Normalization:
    • feed forward 네트워크의 결과 ffn_out에 드롭아웃을 적용하고, 원래의 x에 더한 후 self.norm3으로 정규화.

최종 출력

  • 최종적으로, 디코더 한 층을 통과한 결과 x를 반환.
  • 출력 텐서의 모양은 입력과 마찬가지로 (batch_size, tgt_seq_len, d_model).

 

 

  • Masked Self-Attention:
    • 디코더가 자기 자신의 이전 출력을 기반으로 다음 토큰을 예측할 때, 미래 정보를 보지 못하도록 막아 자기회귀적 특성을 유지.
  • Encoder-Decoder Attention:
    • 인코더에서 추출한 문맥 정보를 디코더에 반영하여, 생성 과정에서 원본 문장의 의미를 고려할 수 있게 함.
  • Feed Forward Network:
    • 각 위치별로 추가적인 비선형 변환을 수행하여 표현의 다양성을 높임.
  • Residual Connections & Layer Normalization:
    • 각 서브레이어마다 잔차 연결과 정규화를 적용함으로써, 깊은 네트워크에서의 학습 안정성과 효율을 증대.
  • Dropout:
    • 각 단계에서 드롭아웃을 적용하여 모델이 특정 뉴런에 과도하게 의존하지 않도록 하여, 일반화 성능을 높임.

 

 


Encoder

인코더는 주로 입력 텍스트를 임베딩하고, 위치 정보를 추가한 후 여러 개의 인코더 층(EncoderLayer)을 순차적으로 통과시키면서 입력의 문맥 정보를 추출.

class Encoder(nn.Module):
    """
    Encoder:
    1) 입력 임베딩(Input Embedding) + 포지셔널 인코딩(Positional Encoding)
    2) N개의 Encoder Layer
    """
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            EncoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embedding(x) # (batch_size, src_seq_len, d_model)
        x = self.pos_encoding(x)
        x = self.dropout(x)
        
        for layer in self.layers:
            x = layer(x, mask=mask)
        return x

 

 

  • 입력 임베딩 + 포지셔널 인코딩: 단어(또는 토큰)를 고정된 차원의 벡터로 변환하고, 시퀀스 내 각 단어의 순서를 고려할 수 있도록 위치 정보를 더함.
  • N개의 Encoder Layer: 여러 개의 인코더 층(EncoderLayer)을 쌓아, self-attention과 피드포워드 네트워크를 통해 입력 시퀀스의 문맥 정보를 풍부하게 추출.

 

1) __init__ 메서드: 초기화 및 모듈 구성

 

  • vocab_size: 어휘 집합의 크기. 임베딩 레이어에서 각 단어를 고유 인덱스에 대응하는 벡터로 변환할 때 사용.
  • d_model: 모델 히든 레이어의 차원. 임베딩 차원, self-attention의 입력/출력 차원 등 여러 곳에서 사용.
  • n_heads: Multi-Head Attention에서 사용되는 헤드 수.
  • d_ff: Position-wise Feed Forward Network 내부의 은닉층 차원. 보통 d_model보다 크며, 표현의 확장을 담당.
  • num_layers: 인코더 층의 개수.
  • max_len: 포지셔널 인코딩에서 사용할 최대 시퀀스 길이. (기본값 5000)
  • dropout: 드롭아웃 확률. 과적합 방지를 위해 사용.

1.1. 임베딩 및 포지셔널 인코딩

 

  • self.embedding:
    • nn.Embedding(vocab_size, d_model)
    • 입력 토큰의 정수 인덱스를 d_model 차원의 벡터로 변환.
  • self.pos_encoding:
    • PositionalEncoding(d_model, max_len)
    • 임베딩에 위치 정보를 추가.
    • Transformer는 순서를 고려하지 않는 구조이므로, 포지셔널 인코딩을 통해 단어의 순서 정보를 보완.

1.2. 여러 개의 인코더 층 구성

 

  • nn.ModuleList:
    • Python의 리스트와 비슷하지만, 내부에 등록된 모든 모듈들이 자동으로 학습 가능한 파라미터 목록에 포함.
  • 리스트 내포(List Comprehension):
    • for _ in range(num_layers)를 사용하여 num_layers 만큼의 EncoderLayer 인스턴스를 생성.
    • 각 인코더 층은 동일한 구조(입력 차원, 헤드 수, 피드포워드 차원, 드롭아웃 확률)를 가짐.

Python의 List comprehension과 PyTorch의 ModuleList를 함께 사용하여, 여러 개의 인코더 레이어(EncoderLayer)를 생성하고 관리

 

 

 

 

    1. 리스트 컴프리헨션으로 EncoderLayer 객체 생성:
      • [EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)]
      • 이 부분은 num_layers 번 반복하여, 각 반복마다 EncoderLayer 인스턴스를 생성한 후, 이들을 하나의 리스트로 만듦.
    2. nn.ModuleList에 리스트 전달:
      • 생성된 리스트를 nn.ModuleList의 생성자에 전달.
      • 이렇게 하면, self.layers는 일반 Python 리스트와 유사하게 인덱싱이나 반복(iteration)이 가능하지만, 내부의 모든 EncoderLayer 객체가 PyTorch의 서브 모듈로 등록되어, 모델 전체의 파라미터 관리에 포함.
  • 왜 ModuleList를 사용하는가?
    • 단순한 Python 리스트에 모듈을 담아두면, PyTorch가 해당 모듈들을 자동으로 인식하지 못해, 학습 과정에서 파라미터 업데이트가 제대로 이루어지지 않을 수 있음.
    • nn.ModuleList를 사용하면, 리스트에 담긴 모든 모듈이 부모 모듈에 등록되어, model.parameters() 호출 시 포함되며, 옵티마이저에 의해 업데이트.

다시

 

  • 리스트 컴프리헨션: for _ in range(num_layers)를 통해 num_layers 번 반복하면서, 각 반복마다 동일한 인자들을 사용하여 EncoderLayer 객체를 생성하고, 이를 리스트에 담음.
  • nn.ModuleList: 생성된 리스트를 nn.ModuleList에 감싸서, 이 리스트가 단순한 Python 리스트가 아니라, PyTorch의 서브 모듈로 등록되는 컨테이너가 됨.
  • 결과:
    • self.layers에는 num_layers 개의 EncoderLayer 인스턴스가 저장되고, 이들은 Transformer 인코더의 여러 층으로 사용.
    • 각 층은 입력 텐서를 받아 self-attention, 피드 포워드 네트워크 등의 연산을 수행하며, 최종적으로 인코더 전체의 출력에 기여.

 

1.3. 드롭아웃 레이어

  • 입력 임베딩과 포지셔널 인코딩의 결과에 드롭아웃을 적용하여, 학습 시 일부 뉴런을 임의로 비활성화함으로써 모델의 일반화 성능을 높임.

 

 

2) forward 메서드: 순전파 연산

입력 인자:

  • x: 입력 토큰의 인덱스 텐서, 모양은 일반적으로 (batch_size, src_seq_len).
  • mask: 선택적 인자이며, attention 계산 시 불필요하거나 패딩된 부분을 무시하도록 하는 마스크임.

2.1. 임베딩 및 포지셔널 인코딩 적용

  • 임베딩:
    • 입력 토큰 인덱스를 d_model 차원의 벡터로 변환하여, 텐서의 모양이 (batch_size, src_seq_len, d_model).
  • 포지셔널 인코딩:
    • self.pos_encoding(x)를 통해 각 단어 벡터에 위치 정보를 추가.
  • 드롭아웃:
    • 드롭아웃을 적용하여 학습 시 일부 뉴런을 무작위로 비활성화함으로써 과적합을 방지.

2.2. 인코더 층들을 순차적으로 적용

루프를 통한 층 적용:

  • self.layers에 포함된 각 EncoderLayer를 순차적으로 호출.
  • 각 층은 입력 텐서 x와 선택적 마스크 mask를 받아 self-attention, residual 연결, layer normalization, 피드포워드 네트워크 등의 연산을 수행.
  • 층을 통과할 때마다 x가 업데이트되며, 최종적으로 Transformer 인코더 전체의 출력이 됨.

2.3. 최종 출력 반환

  • 인코더의 최종 출력은 각 입력 단어에 대한 풍부한 문맥 정보를 포함하는 텐서.
  • 반환되는 텐서의 모양은 (batch_size, src_seq_len, d_model)이며, 이후 디코더나 다른 후속 모듈로 전달.

 


Decoder

입력 토큰들을 임베딩하고 위치 정보를 추가한 뒤 여러 개의 디코더 층(DecoderLayer)을 순차적으로 통과시켜 최종 디코딩 결과를 생성.

class Decoder(nn.Module):
    """
    Decoder:
    1) 입력 임베딩 + 포지셔널 인코딩
    2) N개의 Decoder Layer
    """
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            DecoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        # x: (batch_size, tgt_seq_len)
        # enc_out: (batch_size, src_seq_len, d_model)

        # 1) 임베딩 + 위치 인코딩
        x = self.embedding(x)
        x = self.pos_encoding(x)
        x = self.dropout(x)

        # 2) N개의 디코더 레이어 통과
        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask)
        return x

 

1) 클래스 선언 및 초기화 (__init__)

 

  • 클래스 이름 및 상속:
    • Decoder는 PyTorch의 nn.Module을 상속받아 정의되었으며, Transformer 디코더 부분을 담당.
  • 입력 인자:
    • vocab_size: 어휘 집합의 크기. 디코더 입력으로 들어오는 토큰 인덱스들을 임베딩 벡터로 변환할 때 사용.
    • d_model: 모델의 차원으로, 임베딩 벡터의 차원과 디코더 내부의 feature 차원을 의미.
    • n_heads: Multi-Head Attention에서 사용될 헤드의 수.
    • d_ff: Feed Forward 네트워크의 내부 차원. 일반적으로 d_model보다 큰 값으로 설정.
    • num_layers: 디코더 층(DecoderLayer)의 개수로, 전체 디코더가 몇 개의 층으로 구성될지를 결정.
    • max_len: 포지셔널 인코딩에 사용될 최대 시퀀스 길이.
    • dropout: 드롭아웃 확률로, 과적합을 방지하기 위해 사용.
  • 임베딩 및 포지셔널 인코딩:
    • self.embedding:
      • nn.Embedding(vocab_size, d_model)을 사용하여, 입력 토큰 인덱스를 d_model 차원의 임베딩 벡터로 변환.
    • self.pos_encoding:
      • PositionalEncoding(d_model, max_len) 모듈은 Transformer의 특성상 순서를 반영할 수 있도록 각 임베딩에 위치 정보를 추가.

2) Decoder Layer 생성: self.layers

 

  • 리스트 컴프리헨션:
    • [DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)]
      • range(num_layers)를 통해 num_layers 번 반복하면서, 매 반복마다 DecoderLayer(d_model, n_heads, d_ff, dropout)를 생성.
      • 여기서 _는 반복 변수로, 값은 사용하지 않고 단순히 num_layers 번 반복하겠다는 의미.
  • nn.ModuleList:
    • Python의 일반 리스트와 달리 nn.ModuleList는 내부에 포함된 모든 모듈(여기서는 각 DecoderLayer)들을 부모 모듈에 등록.
    • 이렇게 하면, 디코더의 모든 서브모듈들이 model.parameters() 호출 시 자동으로 포함되어 옵티마이저가 해당 파라미터들을 업데이트할 수 있음.
  • 결과:
    • self.layers는 총 num_layers 개의 DecoderLayer 인스턴스를 담고 있는 컨테이너가 되어, 디코더의 여러 층을 순차적으로 적용할 수 있게 됨.

3) 드롭아웃 레이어 생성

 

  • 드롭아웃은 모델의 학습 시 일부 뉴런을 무작위로 비활성화하여 과적합을 방지하는 역할을 함.
  • 여기서는 임베딩과 포지셔널 인코딩 결과에 드롭아웃을 적용.

 

4) forward 메서드: 순전파 연산

 

  • 입력 인자:
    • x: 디코더에 들어오는 입력 토큰의 인덱스 텐서 (일반적으로 모양은 (batch_size, tgt_seq_len)).
    • enc_out: 인코더(Encoder)의 출력으로, 디코더가 참조할 문맥 정보를 담고 있음. 모양은 (batch_size, src_seq_len, d_model).
    • tgt_mask: 디코더의 Masked Self-Attention에서 사용되는 마스크로, 미래의 정보를 참조하지 못하도록 함.
    • memory_mask: 인코더-디코더 어텐션에서 사용되는 마스크로, 인코더의 특정 부분(예: 패딩 토큰)을 무시할 때 사용.
  • 순차적 처리:
    1. 임베딩:
      • x = self.embedding(x)
        • 입력 토큰 인덱스를 d_model 차원의 벡터로 변환.
    2. 포지셔널 인코딩:
      • x = self.pos_encoding(x)
        • 임베딩 벡터에 각 토큰의 위치 정보를 추가.
    3. 드롭아웃 적용:
      • x = self.dropout(x)
        • 드롭아웃을 통해 임베딩 및 포지셔널 인코딩 결과에 규제를 가합니다.
    4. 디코더 층 통과:
      • for layer in self.layers:
        • 생성된 각 Decoder Layer를 순차적으로 적용.
        • 각 층은 현재의 상태 x와 인코더 출력 enc_out을 받아, 내부의 Masked Self-Attention, Encoder-Decoder Attention, 그리고 Feed Forward 네트워크 연산을 수행.
        • 이때, tgt_mask와 memory_mask가 각 층에 전달되어 적절한 위치의 토큰을 마스킹.
        • 층을 통과할 때마다 x가 업데이트되어, 최종적으로 디코더 전체의 출력으로 사용.
    5. 최종 출력 반환:
      • return x
        • 최종 디코딩 결과를 반환하며, 이 텐서는 후속 작업(예: 소프트맥스, 생성 토큰 예측 등)에 사용.

 

tgt?

 

  • Transformer 모델에서는 보통 소스(source)타깃(target) 두 가지 시퀀스를 다룸.
    • 소스(source, src): 예를 들어, 번역 시스템에서 번역 전의 문장(원문)임.
    • 타깃(target, tgt): 번역 후의 문장(번역 결과)이거나, 모델이 생성해야 하는 목표 시퀀스임.
  • 디코더에서의 역할:
    디코더는 타깃 시퀀스를 입력으로 받아, 이전 단어들을 기반으로 다음 단어를 예측하는 역할을 함.
    • tgt_mask는 디코더의 Masked Self-Attention에서 사용되는데, 이는 디코더가 현재 단어를 예측할 때 미래의 단어 정보를 참조하지 못하도록 미래 정보 차단(Look-Ahead Mask) 역할을 함.

 

 


Transformer

Transformer 모델 전체 구조를 구현한 클래스. Transformer는 인코더(Encoder)와 디코더(Decoder)로 구성되며, 최종적으로 디코더의 출력을 선형 변환(Linear Projection)하여 각 단어(또는 토큰)의 확률 분포(logits)를 얻음.

class Transformer(nn.Module):
    """
    Transformer 전체 구조 : Encoder + Decoder
    최종적으로 Decoder의 출력을 Linear로 projection 하여 각 단어의 확률을 얻는다.
    """
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=512,
        n_heads=8,
        d_ff=2048,
        num_layers=6,
        max_len=5000,
        dropout=0.1
        ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)

        self.fc = nn.Linear(d_model, tgt_vocab_size, bias=False)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        # src: (batch_size, src_seq_len)
        # tgt: (batch_size, tgt_seq_len)
        # src_mask, tgt_mask, memory_mask: 마스킹 텐서

        enc_out = self.encoder(src, mask=src_mask) # (batch_size, src_seq_len, d_model)
        dec_out = self.decoder(tgt, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask) # (batch_size, tgt_seq_len, d_model)

        # 최종 logits
        out = self.fc(dec_out) # (batch_size, tgt_seq_len, tgt_vocab_size)
        return out

1) __init__ 메서드

 

  • 입력 인자:
    • src_vocab_size: 소스(입력) 어휘 집합의 크기 (Encoder에서 사용).
    • tgt_vocab_size: 타깃(출력) 어휘 집합의 크기 (Decoder와 최종 출력 선형 변환에서 사용).
    • d_model: 모델의 차원(임베딩 및 내부 표현의 크기), 기본값 512.
    • n_heads: Multi-Head Attention에서 사용할 헤드 수, 기본값 8.
    • d_ff: 피드포워드 네트워크 내부의 은닉 차원, 기본값 2048.
    • num_layers: 인코더와 디코더에 쌓일 층의 수, 기본값 6.
    • max_len: 포지셔널 인코딩에서 사용할 최대 시퀀스 길이, 기본값 5000.
    • dropout: 드롭아웃 확률, 기본값 0.1.
  • 내부 구성 요소:
    • Encoder 생성:
      • 소스 시퀀스를 처리하여 문맥 정보를 추출.
    • Decoder 생성:
      • 타깃 시퀀스를 처리하며, 인코더의 출력 정보를 활용하여 다음 토큰을 예측.
    • 최종 Linear 계층:
      • 디코더의 출력을 타깃 어휘 크기에 맞게 투영(projection)하여 각 토큰에 대한 점수를 생성.

2) forward 메서드

 

 

  • 입력:
    • src: 소스 시퀀스 텐서, 모양은 (batch_size, src_seq_len).
    • tgt: 타깃 시퀀스 텐서, 모양은 (batch_size, tgt_seq_len).
    • src_mask, tgt_mask, memory_mask: 각각 소스, 타깃, 인코더-디코더 attention 시에 적용할 마스크 텐서들. 이들은 패딩 토큰 무시, look-ahead 마스킹 등 다양한 역할을 수행합니다.
  • 연산 과정:
    1. 인코더 통과:
      • 소스 시퀀스가 인코더를 통과하여 (batch_size, src_seq_len, d_model) 크기의 출력을 생성.
    2. 디코더 통과:
      • 타깃 시퀀스와 인코더의 출력이 디코더에 입력되어, (batch_size, tgt_seq_len, d_model) 크기의 디코더 출력을 만듦.
    3. 최종 선형 투영:
      • 디코더 출력이 Linear 계층을 통과하여, 각 시퀀스 위치별로 tgt_vocab_size 크기의 점수 벡터(로그잇)를 생성.
  • 출력:
    • 최종 출력은 (batch_size, tgt_seq_len, tgt_vocab_size) 모양의 텐서이며, 각 토큰의 예측 점수를 포함

 


generate square subsequent mask

Transformer 디코더에서 look-ahead mask를 생성하기 위한 함수. look-ahead mask는 디코더의 각 시점에서 미래의 토큰을 참조하지 못하도록 만들어, 현재 시점 이전(또는 현재 포함) 토큰들만 볼 수 있도록 함.

def generate_square_subsequent_mask(sz):
    """
    디코더의 look-ahead mask를 생성하기 위한 함수.
    앞으로 등장할 토큰을 보지 못하도록 (i > j) 위치를 0으로 만든다.
    """
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    return mask  # True/False tensor

 

1. torch.ones(sz, sz)

  • 역할:
    크기가 (sz, sz)인 모든 원소가 1인 텐서를 생성.
  • 예시:
    만약 sz=3이면,
     
    tensor([[1., 1., 1.],
                  [1., 1., 1.],
                  [1., 1., 1.]])

2. torch.triu(...)

  • 역할:
    텐서의 upper triangular (상삼각형) 부분만 남김.
    즉, 행 인덱스 i가 열 인덱스 j보다 작거나 같은 (i <= j) 위치의 원소들은 그대로 두고, 그 외의 부분은 0으로 만듦.
  • 예시:
    위의 3×3 텐서에 torch.triu를 적용하면,
     
    tensor([[1., 1., 1.],
                  [0., 1., 1.],
                  [0., 0., 1.]])

3. ( ... == 1 )

  • 역할:
    숫자형 텐서를 불리언(Boolean) 텐서로 변환.
    각 원소가 1이면 True, 0이면 False로 바뀜.
  • 결과:
    tensor([[True, True, True],
                  [False, True, True],
                  [False, False, True]])

4. .transpose(0, 1)

  • 역할:
    텐서를 전치(transpose)하여 행과 열의 위치를 바꿈.
  • 결과:
    위의 예시를 전치하면,
     
    tensor([[True, False, False],
                  [True, True, False],
                  [True, True, True]])

최종 결과 및 의미

  • 최종 마스크:
    최종적으로 생성된 마스크는 하삼각(lower triangular) 형태의 불리언 텐서.
  • 예시 (sz=3):
     
    [[True, False, False],
     [True, True, False], [True, True, True]]
  • 의미:
    • 행렬의 행(i): 디코더의 현재 시점을 나타냄.
    • 행렬의 열(j): 참조 가능한 토큰의 위치를 나타냄.
    • mask[i, j]가 True이면, 시점 i에서 위치 j의 토큰을 참조할 수 있음을 의미.
    • 결과적으로, 각 시점 i에서는 j가 i보다 큰(미래의) 토큰은 False로 처리되어 참조할 수 없음.

왜 Look-Ahead Mask가 필요한가?

  • 목적:
    Transformer 디코더는 훈련 시 전체 타깃 시퀀스를 입력으로 받지만, 실제 생성(예측) 과정에서는 이전 토큰들만 이용해야 함.
    look-ahead mask는 현재 시점 이후의 토큰들을 마스킹(참조 불가)하여, 모델이 미래 정보를 참조하지 못하게 함.
  • 결과:
    이를 통해 디코더는 자기회귀(autoregressive) 방식으로 작동하여, 한 시점에서 다음 토큰을 예측할 때 이미 생성된 이전 토큰들만 참고하게 됨.

참고

generate_square_subsequent_mask 함수는 디코더의 look-ahead mask를 만들기 위한 간단한 유틸리티 함수에 가깝기 때문에, 굳이 하나의 클래스 내부 메서드로 넣기보다는 별도의 함수로 정의해둔 것.

  1. 역할이 명확한 유틸 함수
    • 이 함수는 단순히 torch.triu() 등을 이용해 특정 패턴(True/False)으로 마스크 텐서를 생성하는 역할만 함.
    • 내부에 파라미터나 학습 로직이 없으며, 모델의 구조(nn.Module)와는 직접적으로 연관되지 않음.
  2. 재사용성/가독성
    • 여러 곳에서 동일한 형태의 마스크가 필요할 수 있으므로, 별도의 전역 함수로 두어 재사용하기 편리.
    • 별도의 클래스 메서드로 구현할 경우, 해당 클래스로부터 인스턴스를 만들거나 클래스를 임포트해야만 함수를 사용할 수 있게 되는데, 이러한 번거로움을 피할 수 있음.
  3. 클래스 설계 원칙
    • Transformer나 Attention 모듈 등은 보통 파라미터를 갖는 학습 모듈을 의미함.
    • 반면 generate_square_subsequent_mask는 어떤 내부 상태(파라미터)도 필요 없고, 입력(sz)만 주어지면 즉시 결과를 반환하는 순수 함수(pure function)에 가까움.
    • 따라서 유틸성 함수를 별도로 두는 것이 클래스 설계 원칙(단일 책임의 원칙, SRP)에 더 적합함.

“반드시 클래스 내부 메서드여야 할 만큼 모델 구조와 밀접하게 결합된 기능”이 아니므로, 간단히 전역 함수로 정의해둔 것.

 

 

 


Main!

if __name__ == "__main__":
    # 하이퍼파라미터 설정
    src_vocab_size = 1000
    tgt_vocab_size = 1200
    d_model = 32
    n_heads = 4
    d_ff = 64
    num_layers = 2

    PAD_IDX = 0  # PAD 토큰 인덱스
    num_epochs = 5
    batch_size = 2
    src_seq_len = 10
    tgt_seq_len = 9

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 모델 생성
    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=d_model,
        n_heads=n_heads,
        d_ff=d_ff,
        num_layers=num_layers
    ).to(device)

    # 옵티마이저 & 손실 함수(CrossEntropyLoss)
    # PAD 토큰 무시는 ignore_index=PAD_IDX로 설정
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

    # 가짜 데이터 생성 (실제로는 Dataset, DataLoader 사용)
    # src_input, tgt_input, tgt_output
    # 예: tgt는 디코더 입력 / tgt_output은 한 토큰 뒤 (shifted by 1)인 정답
    # 여기서는 간단히 같은 길이로 맞춰주고 랜덤 인덱스 생성 (PAD_IDX 제외하려면 최소 1로 한다고 가정)
    for epoch in range(num_epochs):
        # 미니배치 예시: 그냥 1배치씩 학습 (for문으로 여러 batch 반복 가능)
        src = torch.randint(low=1, high=src_vocab_size, size=(batch_size, src_seq_len)).to(device)
        tgt_input = torch.randint(low=1, high=tgt_vocab_size, size=(batch_size, tgt_seq_len)).to(device)

        # 보통 실제 학습에서는 디코더의 정답(tgt_output)이 tgt_input보다 한 토큰 뒤이지만,
        # 여기서는 단순화해서 tgt_input과 동일하게 두고 예시만 보여줌.
        # (실제로는 shift해서 "Teacher Forcing" 용도로 만듦)
        tgt_output = tgt_input.clone()

        # Look-ahead mask 생성
        tgt_mask = generate_square_subsequent_mask(tgt_seq_len).unsqueeze(0).to(device)

        # 마스킹 텐서(src_mask, memory_mask 등)는 여기서는 None으로
        optimizer.zero_grad()

        # 순전파(forward)
        logits = model(src, tgt_input, src_mask=None, tgt_mask=tgt_mask, memory_mask=None)
        # logits shape: (batch_size, tgt_seq_len, tgt_vocab_size)

        # CrossEntropyLoss를 위해 차원 변환
        # (batch_size * tgt_seq_len, tgt_vocab_size)
        logits = logits.view(-1, logits.size(-1))  # (batch_size * seq_len, vocab_size)
        tgt_output = tgt_output.view(-1)  # (batch_size * seq_len, )

        # 손실 계산
        loss = criterion(logits, tgt_output)

        # 역전파 & 파라미터 업데이트
        loss.backward()
        optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

 

 

1. if __name__ == "__main__":

  • 역할:
    • 이 코드는 해당 파일이 직접 실행될 때만 아래의 코드를 실행하도록 함.
    • 만약 다른 파일에서 이 파일의 클래스나 함수를 가져다 쓴다면, 이 부분은 실행되지 않음.
  • 파라미터:
    • __name__은 Python에서 내장된 변수로, 파일이 직접 실행될 경우 "__main__" 값을 가지며, 모듈로 임포트될 경우 파일의 이름을 가짐.

2. 하이퍼파라미터 설정

  • 역할:
    • 모델 구성과 학습에 필요한 여러 중요한 값들을 미리 설정함.
    • 각 변수는 모델의 구조나 학습 방식에 영향을 줌.
  • 파라미터 설명:
    • src_vocab_size와 tgt_vocab_size:
      • 원본(source)과 목표(target) 문장의 단어 사전 크기를 의미함.
      • 예를 들어, 원본 문장에 1000개의 서로 다른 단어가 있다고 가정함.
    • d_model:
      • 각 단어를 임베딩할 때의 차원 수를 의미함.
      • 여기서는 각 단어를 32차원 벡터로 표현함.
    • n_heads:
      • Multi-Head Attention에서 몇 개의 헤드를 사용할지를 의미함.
      • 여러 개의 헤드를 사용하면 다양한 관점에서 단어 간의 관계를 파악할 수 있음.
    • d_ff:
      • Transformer의 Feed Forward Neural Network(FFNN) 내부의 차원 수를 의미함.
      • 이 값은 중간 계산 과정에서의 차원을 결정함.
    • num_layers:
      • Transformer 모델의 레이어(층) 수를 의미함.
    • PAD_IDX:
      • 패딩(padding) 토큰의 인덱스를 의미함.
      • 패딩은 문장 길이를 맞추기 위해 사용되며, 손실 계산 시 무시함.
    • num_epochs:
      • 전체 데이터셋을 몇 번 반복하여 학습할지를 의미함.
    • batch_size:
      • 한 번에 모델에 넣어 학습할 데이터의 묶음 크기를 의미함.
    • src_seq_len과 tgt_seq_len:
      • 각각 원본 문장과 목표 문장의 길이(단어 수)를 의미함.

3. 장치(Device) 설정

  • 역할:
    • 모델과 데이터를 GPU 또는 CPU 중 어느 곳에서 처리할지 결정함.
    • GPU가 사용 가능하면 GPU를 사용하여 계산 속도를 높일 수 있음.
  • 파라미터 설명:
    • "cuda":
      • NVIDIA GPU를 의미하며, PyTorch에서 GPU 사용을 나타냄.
    • "cpu":
      • 중앙 처리 장치(CPU)를 의미함.
    • torch.cuda.is_available():
      • 현재 시스템에 CUDA(GPU 사용 가능 여부)가 있는지 확인하는 함수임.

4. 모델 생성

  • 역할:
    • Transformer 모델 객체를 생성함.
    • 모델의 구성 요소들이 하이퍼파라미터에 맞게 초기화됨.
  • 파라미터 설명:
    • src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, num_layers:
      • 앞서 정의한 하이퍼파라미터들이며, 모델 내부의 임베딩 크기, 어텐션 헤드 수, 레이어 수 등을 결정함.
    • .to(device):
      • 모델을 지정된 장치(예: GPU 또는 CPU)로 옮겨서 연산이 해당 장치에서 실행되도록 함임.

5. 옵티마이저와 손실 함수 설정

  • 역할:
    • 모델의 학습 과정을 관리하기 위해 옵티마이저(모델의 가중치를 업데이트하는 역할)와 손실 함수(예측과 정답의 차이를 계산하는 역할)를 설정함.
  • 파라미터 설명:
    • 옵티마이저(Adam):
      • torch.optim.Adam:
        • Adam 알고리즘은 가중치 업데이트 시 각 파라미터의 학습률을 자동으로 조절함.
      • model.parameters():
        • 모델의 학습 가능한 모든 파라미터(가중치)를 옵티마이저에 전달함.
      • lr=1e-3:
        • 학습률(Learning Rate)을 0.001로 설정함.
    • 손실 함수(CrossEntropyLoss):
      • nn.CrossEntropyLoss:
        • 다중 분류 문제에서 주로 사용되는 손실 함수임.
        • 모델의 출력과 정답 사이의 차이를 계산하여 학습에 활용함.
      • ignore_index=PAD_IDX:
        • PAD 토큰(패딩)으로 설정된 인덱스는 손실 계산에서 제외함.

6. 가짜 데이터 생성

  • 역할:
    • 학습을 테스트하기 위해 실제 데이터 대신 임의의 숫자로 이루어진 가짜 데이터를 생성함.
    • 보통 실제 학습에서는 데이터셋과 DataLoader를 사용함.
  • 파라미터 설명:
    • torch.randint(low=1, high=src_vocab_size, size=(batch_size, src_seq_len)):
      • 1 이상 src_vocab_size 미만의 임의의 정수를 batch_size x src_seq_len 모양의 텐서로 생성함.
      • low와 high는 생성할 정수의 범위를 지정함.
    • .to(device):
      • 생성된 데이터를 지정된 장치로 옮김.
    • tgt_input:
      • 목표 문장의 입력 데이터를 의미하며, 랜덤 정수로 구성됨.
    • tgt_output = tgt_input.clone():
      • 디코더의 정답 데이터를 생성함.
      • 보통은 tgt_input보다 한 토큰 뒤가 정답임. 여기서는 단순화를 위해 동일하게 사용함.

7. Look-ahead Mask 생성

  • 역할:
    • 디코더에서는 미래의 단어 정보를 보지 못하도록 마스킹(masking)을 해야 함.
    • Look-ahead Mask는 현재 단어 이후의 정보를 가리지 않도록 만들어 주는 역할임.
  • 파라미터 설명:
    • generate_square_subsequent_mask(tgt_seq_len):
      • 정사각형 모양의 마스크를 생성함.
      • 각 행은 해당 위치 이후의 단어들을 가리기 위해 사용됨임.
    • .unsqueeze(0):
      • 텐서에 새로운 차원을 추가함으로써, 배치 차원을 맞추어 줌.
    • .to(device):
      • 마스크 텐서를 지정된 장치로 옮김.

8. 옵티마이저 기울기 초기화

  • 역할:
    • 역전파(Backpropagation) 전에 이전 단계에서 계산된 기울기를 모두 0으로 초기화함.
    • 그래야 매 학습 단계마다 새로운 기울기만 계산됨.
  • 파라미터 설명:
    • optimizer.zero_grad():
      • 옵티마이저에 등록된 모든 파라미터의 기울기를 0으로 만듦.

9. 순전파(Forward Pass)

  • 역할:
    • 모델에 입력 데이터를 전달하여 예측 결과를 얻는 과정임.
    • 여기서 logits는 모델이 예측한 각 단어의 점수(아직 확률로 변환되지 않음)를 의미함.
  • 파라미터 설명:
    • src:
      • 원본 문장의 임베딩 데이터임.
    • tgt_input:
      • 디코더에 들어갈 목표 문장의 입력 데이터임.
    • src_mask, memory_mask:
      • 원본 문장과 인코더-디코더 사이의 마스킹 정보를 의미함.
      • 여기서는 단순화를 위해 None으로 설정됨.
    • tgt_mask:
      • 앞서 생성한 Look-ahead Mask를 의미함.

10. 차원 변환(Reshape)

  • 역할:
    • 손실 함수에 넣기 전에 텐서의 모양을 변경함으로써, 데이터를 평탄화(reshape)함.
    • CrossEntropyLoss는 특정한 형태의 텐서를 요구함.
  • 파라미터 설명:
    • logits.view(-1, logits.size(-1)):
      • -1은 자동으로 나머지 차원을 계산하도록 하는 파라미터임.
      • 결과적으로, (batch_size * tgt_seq_len, tgt_vocab_size) 형태로 변경됨.
    • tgt_output.view(-1):
      • 정답 텐서를 (batch_size * tgt_seq_len,) 형태의 1차원 텐서로 평탄화함.

11. 손실 계산(Loss Calculation)

  • 역할:
    • 모델의 예측 값과 실제 정답 간의 차이를 계산함으로써, 모델의 성능을 평가함.
    • 손실 값이 낮을수록 예측이 정확함.
  • 파라미터 설명:
    • criterion은 미리 정의한 nn.CrossEntropyLoss(ignore_index=PAD_IDX)임.
    • logits와 tgt_output을 비교하여, 각 단어마다의 예측 오차를 계산함.

12. 역전파(Backpropagation)와 파라미터 업데이트

  • 역할:
    • 역전파:
      • 손실 함수를 기준으로 각 파라미터가 얼마만큼 기여했는지 기울기(gradient)를 계산함.
    • 파라미터 업데이트:
      • 옵티마이저가 계산된 기울기를 사용하여 모델의 파라미터를 업데이트함으로써, 예측 오차를 줄여감.
  • 파라미터 설명:
    • loss.backward():
      • 손실 함수로부터 각 파라미터에 대한 미분(기울기)을 계산함.
    • optimizer.step():
      • 계산된 기울기를 바탕으로 모델의 파라미터를 실제로 조정함.

13. 학습 진행 상황 출력

  • 역할:
    • 현재 학습 단계(epoch)와 손실 값을 콘솔에 출력하여, 학습 과정이 어떻게 진행되고 있는지 확인할 수 있음.
  • 파라미터 설명:
    • epoch+1과 num_epochs:
      • 현재 진행 중인 epoch과 전체 epoch 수를 표시함.
    • loss.item():
      • 손실 텐서에서 실제 스칼라 값을 추출함.
    • :.4f:
      • 소수점 네 자리까지 출력함.

전체 요약

이 코드는 Transformer 모델의 간단한 학습 예시임.

  1. 설정 단계에서 모델의 구성 요소와 학습에 필요한 여러 하이퍼파라미터들을 정의함.
  2. 장치 설정을 통해 GPU 또는 CPU에서 학습할 수 있도록 함.
  3. 모델 생성 후, 옵티마이저와 손실 함수를 정의하여 학습 준비를 마침.
  4. 가짜 데이터를 생성하여 모델에 입력하고, Look-ahead Mask를 생성하여 디코더가 미래 단어를 보지 못하도록 조치함.
  5. 순전파로 예측 값을 얻고, 이를 기반으로 손실을 계산한 후,
  6. 역전파와 파라미터 업데이트를 수행하여 모델을 학습시킴.
  7. 각 epoch마다 손실 값을 출력하여 학습 과정을 모니터링함.

 

 

 


전체 코드 with comments

 

"""
PyTorch로 Vanilla Transformer (Attention is All You Need, 2017)을 간단히 구현한 코드.
Transformer의 구조는 크게 Encoder와 Decoder로 나눌 수 있으며, 각각이 여러 개의 동일한 층(layer)으로 반복되어 쌓여 있음.

Encoder
1. self-attention (멀티헤드)
2. feed-forward network
이 2단계를 층으로 하여 N개 층을 반복 구성

Decoder
1. self-attention (멀티헤드)
2. Encoder에서 전달받은 context와의 attention
3. feed-forward network
이 3단계를 층으로 하여 N개 층을 반복 구성

최종적으로 Decoder 출력에 Linear + Softmax 등을 적용하여 다음 단어를 예측하거나 sequence를 생성할 수 있음.

아래 코드는 다음 순서로 모듈을 구성.
1. Positional Encoding
2. ScaledDotProdectAttention
3. MultiHeadAttention
4. PositionwiseFeedForward
5. EncoderLayer / DecoderLayer
6. Encoder / Decoder
7. Transformer (Encoder + Decoder 전체 구성)

참고
The Illustrated Transformer, https://nlpinkorean.github.io/illustrated-transformer/
"""


import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    """
    Transformer는 입력 토큰의 순서를 고려하기 위해 위치 정보를 부가한다.
    이를 위해 논문에서는 사인/코사인 함수를 이용한 Positional Encoding을 사용.
    PE(pos, 2i)   = sin( pos / (10000^(2i/d_model)) )
    PE(pos, 2i+1) = cos( pos / (10000^(2i/d_model)) )
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        # pos : 0부터 max_len까지
        # i : 0부터 d_model까지
        pe = torch.zeros(max_len, d_model)  # (mex_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # (d_model / 2)

        pe[:, 0::2] = torch.sin(position * div_term)  # 짝수 채널
        pe[:, 1::2] = torch.cos(position * div_term)  # 홀수 채널

        # (1, max_len, d_model)의 형태로 만들어 register_buffer에 저장
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x : (batch_size, seq_len, d_model)
        학습 파라미터로 삼지 않고, 단순히 더해만 주면 됨.
        """
        seq_len = x.size(1)
        # pe[:, :seq_len] => (1, seq_len, d_model)
        x = x + self.pe[:, :seq_len]
        return x
    

class ScaledDotProductAttention(nn.Module):
    """
    Scaled Dot-Product Attention
    1) Q * K^T를 수행 후, sqrt(d_k)로 나눈다.
    2) 마스크를 적용(Optional)
    3) 소프트맥스 -> V 곱연산
    """
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, mask=None):
        # Q, K, V shape:
        #   Q : (batch_size, n_heads, seq_len_q, d_k)
        #   K : (batch_size, n_heads, seq_len_k, d_k)
        #   V : (batch_size, n_heads, seq_len_k, d_v)
        
        d_k = K.size(-1)  # 보통 d_k = d_model / n_heads
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        # scores : (batch_size, n_heads, seq_len_q, seq_len_k)

        if mask is not None:
            # mask가 True인 부분을 매우 작은 값으로 채워 softmax 결과를 0이 되도록 만든다.
            scores = scores.masked_fill(mask == 0, -1e9)

        attn = torch.softmax(scores, dim=-1)
        # attn : (batch_size, n_heads, seq_len_q, seq_len_k)

        output = torch.matmul(attn, V)
        # output : (batch_size, n_heads, seq_len_q, d_v)

        return output, attn
    

class MultiHeadAttention(nn.Module):
    """
    Multi-Head Attention
    1) 입력을 W_Q, W_K, W_V로 각각 선형 변환
    2) Scaled Dot-Product Attention 수행
    3) 여러 헤드를 이어서 최종 선형 변환
    """
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.d_v = d_model // n_heads

        # W_Q, W_K, W_V
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)

        # 출력 투영
        self.fc = nn.Linear(d_model, d_model)

        self.attention = ScaledDotProductAttention()

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # 1) Q, K, V 각각을 선형 변환
        # 이후 (batch_size, seq_len, d_model) -> (batch_size, seq_len, n_heads, d_k) 형태로 변환
        # 그리고 헤드 차원(n_heads)을 앞으로 가져옴
        q = self.W_Q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        k = self.W_K(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        v = self.W_V(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)

        # 2) Scaled Dot-Product Attention
        if mask is not None:
            # mask shape를 (batch_size, 1, seq_len_q, seq_len_k) -> (batch_size, n_heads, seq_len_q, seq_len_k)로 확장
            mask = mask.unsqueeze(1)

        out, attn = self.attention(q, k, v, mask=mask)
        # out: (batch_size, n_heads, seq_len, d_v)

        # 3) 여러 헤드를 다시 concat -> (batch_size, seq_len, n_heads * d_v)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # 4) 최종 선형 변환
        out = self.fc(out)

        return out, attn
    

class PositionwiseFeedForward(nn.Module):
    """
    Position-wise FeedForward
    FFN(x) = max(0, xW1 + b1)W2 + b2
    """
    def __init__(self, d_model, d_ff):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        return self.linear2(self.relu(self.linear1(x)))



class EncoderLayer(nn.Module):
    """
    Encoder Layer:
    1) Self-Attention (Multi-Head)
    2) Layer Normalization + Residual Connection
    3) Feed Forward
    4) Layer Normalization + Residual Connection
    """
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # x: (batch_size, seq_len, d_model)
        # mask: (batch_size, seq_len) 형태 등
        
        # 1) Self-Attention
        attn_out, _ = self.self_attn(x, x, x, mask=mask)

        # 2) Layer Nrom & Residual
        x = self.norm1(x + self.dropout1(attn_out))

        # 3) Feed Forward
        ffn_out = self.ffn(x)

        # 4) Layer Norm & Residual
        x = self.norm2(x + self.dropout2(ffn_out))

        return x


class DecoderLayer(nn.Module):
    """
    Decoder Layer:
    1) Masked Self-Attention (Look-ahead mask)
    2) Layer Normalization + Residual
    3) Encoder-Decoder Attention
    4) Layer Normalization + Residual
    5) Feed Forward
    6) Layer Normalization + Residual
    """
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        # x: (batch_size, tgt_seq_len, d_model) - decoder 입력
        # enc_out: (batch_size, src_seq_len, d_model) - encoder 출력
        # tgt_mask: 디코더의 look-ahead mask (자기회귀)
        # memory_mask: 인코더-디코더 어텐션을 위한 mask

        # 1) Masked Self-Attention
        self_attn_out, _ = self.self_attn(x, x, x, mask=tgt_mask)
        x = self.norm1(x + self.dropout1(self_attn_out))

        # 2) Encoder-Decoder Attention
        # Q = 디코더의 현재 상태, K, V = 인코더 출력
        enc_dec_attn_out, _ = self.enc_dec_attn(x, enc_out, enc_out, mask=memory_mask)
        x = self.norm2(x + self.dropout2(enc_dec_attn_out))

        # 3) Feed Forward
        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout3(ffn_out))

        return x


class Encoder(nn.Module):
    """
    Encoder:
    1) 입력 임베딩(Input Embedding) + 포지셔널 인코딩(Positional Encoding)
    2) N개의 Encoder Layer
    """
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            EncoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # x: (batch_size, src_seq_len)
        # mask: (batch_size, src_seq_len,) True/False 형태 (패딩 등에 대한 마스킹) 등

        # 1) 임베딩 + 위치 인코딩
        x = self.embedding(x) # (batch_size, src_seq_len, d_model)
        x = self.pos_encoding(x)
        x = self.dropout(x)
        
        # 2) N개의 인코더 레이어 통과
        for layer in self.layers:
            x = layer(x, mask=mask)
        return x


class Decoder(nn.Module):
    """
    Decoder:
    1) 입력 임베딩 + 포지셔널 인코딩
    2) N개의 Decoder Layer
    """
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.ModuleList([
            DecoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        # x: (batch_size, tgt_seq_len)
        # enc_out: (batch_size, src_seq_len, d_model)

        # 1) 임베딩 + 위치 인코딩
        x = self.embedding(x)
        x = self.pos_encoding(x)
        x = self.dropout(x)

        # 2) N개의 디코더 레이어 통과
        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask)
        return x

class Transformer(nn.Module):
    """
    Transformer 전체 구조 : Encoder + Decoder
    최종적으로 Decoder의 출력을 Linear로 projection 하여 각 단어의 확률을 얻는다.
    """
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=512,
        n_heads=8,
        d_ff=2048,
        num_layers=6,
        max_len=5000,
        dropout=0.1
        ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)

        self.fc = nn.Linear(d_model, tgt_vocab_size, bias=False)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        # src: (batch_size, src_seq_len)
        # tgt: (batch_size, tgt_seq_len)
        # src_mask, tgt_mask, memory_mask: 마스킹 텐서

        enc_out = self.encoder(src, mask=src_mask) # (batch_size, src_seq_len, d_model)
        dec_out = self.decoder(tgt, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask) # (batch_size, tgt_seq_len, d_model)

        # 최종 logits
        out = self.fc(dec_out) # (batch_size, tgt_seq_len, tgt_vocab_size)
        return out


def generate_square_subsequent_mask(sz):
    """
    디코더의 look-ahead mask를 생성하기 위한 함수.
    앞으로 등장할 토큰을 보지 못하도록 (i > j) 위치를 0으로 만든다.
    """
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    return mask  # True/False tensor


"""
간단한 테스트를 위한 예시
"""
if __name__ == "__main__":
    # 하이퍼파라미터 설정
    src_vocab_size = 1000
    tgt_vocab_size = 1200
    d_model = 32
    n_heads = 4
    d_ff = 64
    num_layers = 2

    PAD_IDX = 0  # PAD 토큰 인덱스
    num_epochs = 5
    batch_size = 2
    src_seq_len = 10
    tgt_seq_len = 9

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 모델 생성
    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=d_model,
        n_heads=n_heads,
        d_ff=d_ff,
        num_layers=num_layers
    ).to(device)

    # 옵티마이저 & 손실 함수(CrossEntropyLoss)
    # PAD 토큰 무시는 ignore_index=PAD_IDX로 설정
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

    # 가짜 데이터 생성 (실제로는 Dataset, DataLoader 사용)
    # src_input, tgt_input, tgt_output
    # 예: tgt는 디코더 입력 / tgt_output은 한 토큰 뒤 (shifted by 1)인 정답
    # 여기서는 간단히 같은 길이로 맞춰주고 랜덤 인덱스 생성 (PAD_IDX 제외하려면 최소 1로 한다고 가정)
    for epoch in range(num_epochs):
        # 미니배치 예시: 그냥 1배치씩 학습 (for문으로 여러 batch 반복 가능)
        src = torch.randint(low=1, high=src_vocab_size, size=(batch_size, src_seq_len)).to(device)
        tgt_input = torch.randint(low=1, high=tgt_vocab_size, size=(batch_size, tgt_seq_len)).to(device)

        # 보통 실제 학습에서는 디코더의 정답(tgt_output)이 tgt_input보다 한 토큰 뒤이지만,
        # 여기서는 단순화해서 tgt_input과 동일하게 두고 예시만 보여줌.
        # (실제로는 shift해서 "Teacher Forcing" 용도로 만듦)
        tgt_output = tgt_input.clone()

        # Look-ahead mask 생성
        tgt_mask = generate_square_subsequent_mask(tgt_seq_len).unsqueeze(0).to(device)

        # 마스킹 텐서(src_mask, memory_mask 등)는 여기서는 None으로
        optimizer.zero_grad()

        # 순전파(forward)
        logits = model(src, tgt_input, src_mask=None, tgt_mask=tgt_mask, memory_mask=None)
        # logits shape: (batch_size, tgt_seq_len, tgt_vocab_size)

        # CrossEntropyLoss를 위해 차원 변환
        # (batch_size * tgt_seq_len, tgt_vocab_size)
        logits = logits.view(-1, logits.size(-1))  # (batch_size * seq_len, vocab_size)
        tgt_output = tgt_output.view(-1)  # (batch_size * seq_len, )

        # 손실 계산
        loss = criterion(logits, tgt_output)

        # 역전파 & 파라미터 업데이트
        loss.backward()
        optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

 

 

 


 

전체 코드

 

import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp( torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model) )
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len]
        return x
    

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, mask=None):
        d_k = K.size(-1)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attn = torch.softmax(scores, dim=1)
        output = torch.matmul(attn, V)
        return output, attn
    

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.d_v = d_model // n_heads

        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        self.fc = nn.Linear(d_model, d_model)

        self.attention = ScaledDotProductAttention()

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)

        q = self.W_Q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        k = self.W_K(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        v = self.W_V(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)

        if mask is not None:
            mask = mask.unsqueeze(1)

        out, attn = self.attention(q, k, v, mask=mask)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        out = self.fc(out)
        return out, attn
    

class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.linear2(self.relu(self.linear1(x)))
    

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out, _ = self.self_attn(x, x, x, mask=mask)
        x = self.norm1(x + self.dropout1(attn_out))

        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout2(ffn_out))
    
        return x
    

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        self_attn_out, _ = self.self_attn(x, x, x, mask=tgt_mask)
        x = self.norm1(x + self.dropout1(self_attn_out))

        enc_dec_attn_out, _ = self.enc_dec_attn(x, enc_out, enc_out, mask=memory_mask)
        x = self.norm2(x + self.dropout2(enc_dec_attn_out))

        ffn_out = self.ffn(x)
        x = self.norm3(x + self.dropout3(ffn_out))

        return x
    

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.Module([
            EncoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(X)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, mask=mask)
        return x
    

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.layers = nn.Module([
            DecoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask)
        return x
    
class Transformer(nn.Module):
    def __init__(
            self,
            src_vocab_size,
            tgt_vocab_size,
            d_model=512,
            n_heads=8,
            d_ff=2048,
            num_layers=6,
            max_len=5000,
            dropout=0.1
    ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, n_heads, d_ff, num_layers, max_len, dropout)

        self.fc = nn.Linear(d_model, tgt_vocab_size, bias=False)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        enc_out = self.encoder(src, mask=src_mask)
        dec_out = self.decoder(tgt, enc_out, tgt_mask=tgt_mask, memory_mask=memory_mask)
        out = self.fc(dec_out)
        return out
    

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
    return mask


if __name__ == "__main__":
    src_vocab_size = 1000
    tgt_vocab_size = 1200
    d_model = 32
    n_heads = 4
    d_ff = 64
    num_layers = 2

    PAD_IDX = 0
    num_epochs = 5
    batch_size = 2
    src_seq_len = 10
    tgt_seq_len = 9

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=d_model,
        n_heads=n_heads,
        d_ff=d_ff,
        num_layers=num_layers
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

    for epoch in range(num_epochs):
        src = torch.randint(low=1, high=src_vocab_size, size=(batch_size, src_seq_len)).to(device)
        tgt_input = torch.randint(low=1, high=tgt_vocab_size, size=(batch_size, tgt_seq_len)).to(device)

        tgt_output = tgt_input.clone()

        tgt_mask = generate_square_subsequent_mask(tgt_seq_len).unsqueeze(0).to(device)

        optimizer.zero_grad()

        logits = model(src, tgt_input, src_mask=None, tgt_mask=tgt_mask, memory_mask=None)

        logits = logits.view(-1, logits.size(-1))
        tgt_output = tgt_output.view(-1)

        loss = criterion(logits, tgt_output)

        loss.backward()
        optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")