Colab으로 하루에 하나씩 딥러닝

생성모델_2.변형 오토인코더 본문

딥러닝_개념

생성모델_2.변형 오토인코더

Elleik 2023. 2. 6. 22:38
728x90

변형 오토인코더(Variational Autoencoder)

  • 표준편차와 평균을 이용하여 확률 분포를 만듦
  • 해당 확률 분포에서 샘플링하여 디코더를 통과시킨 후 새로운 데이터를 만들어 냄
  • z(잠재 백터(latent vector)라는 가우시안 분포를 이용하여 입력 데이터와 조금 다른 출력 데이터를 만듦 

변형 오토인코더 구성요소

1. 인코더

  • x를 입력받아 잠재 벡터 z와 대응되는 평균과 분산을 구하는 네트워크

2. 디코더

  • z를 입력받아 x와 대응되는 평균과 분산을 구하는 네트워크

오토인코더 실습

### 필요한 라이브러리 호출

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tqdm.autonotebook import tqdm
%matplotlib inline
from IPython import display
import pandas as pd
import tensorflow_probability as tfp
ds = tfp.distributions

### 초깃값 설정

TRAIN_BUF = 60000
TEST_BUF = 10000
BATCH_SIZE = 512
N_TRAIN_BATCHES = int(TRAIN_BUF/BATCH_SIZE)
N_TEST_BATCHES = int(TEST_BUF/BATCH_SIZE)

### fashion_mnist 데이터셋 로딩 및 데이터셋 분리

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()  # 훈련과 테스트 데이터셋으로 분리
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype("float32") / 255.0
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype("float32") / 255.0
train_dataset = (tf.data.Dataset.from_tensor_slices(train_images).shuffle(60000).batch(512))  # 훈련 데이터셋 준비
test_dataset = (tf.data.Dataset.from_tensor_slices(test_images).shuffle(10000).batch(512))	# 테스트 데이터셋 준비

데이터셋 로딩 완료


### 변형 오토인코더 네트워크 생성

class VAE(tf.keras.Model):
    def __init__(self, **kwargs):
        super(VAE, self).__init__()
        self.__dict__.update(kwargs)

        self.enc = tf.keras.Sequential(self.enc)
        self.dec = tf.keras.Sequential(self.dec)

    def encode(self, x):
        mu, sigma = tf.split(self.enc(x), num_or_size_splits=2, axis=1)
        return ds.MultivariateNormalDiag(loc=mu, scale_diag=sigma)  # 인코더에서 평균과 표준편차를 정의하고 z의 평균과 분산을 모수로 하는 정규 분포 생성
    def reparameterize(self, mean, logvar):
        epsilon = tf.random.normal(shape=mean.shape)
        return epsilon * tf.exp(logvar * 0.5) + mean  # 설명 1

    def reconstruct(self, x):
        mu, _ = tf.split(self.enc(x), num_or_size_splits=2, axis=1)
        return self.decode(mu)  # 입력 데이터를 출력 데이터로 재구성

    def decode(self, z):
        return self.dec(z)   # 인코더가 만든 z를 받아 입력 데이터로 재구성

    def loss_function(self, x):
        q_z = self.encode(x)
        z = q_z.sample()
        x_recon = self.decode(z)
        p_z = ds.MultivariateNormalDiag(
          loc=[0.] * z.shape[-1], scale_diag=[1.] * z.shape[-1]
          )
        kl_div = ds.kl_divergence(q_z, p_z)
        latent_loss = tf.reduce_mean(tf.maximum(kl_div, 0))
        recon_loss = tf.reduce_mean(tf.reduce_sum(tf.math.square(x - x_recon), axis=0))
        return recon_loss, latent_loss  # 설명 2

    def gradients(self, x):
        with tf.GradientTape() as tape:
            loss = self.loss_function(x)
        return tape.gradient(loss, self.trainable_variables)  # 설명 3

    @tf.function  # 파이썬 제어문을 텐서플로에서 자동으로 컴파일
    def train(self, train_x):
        gradients = self.gradients(train_x)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

상세 설명

  • 설명 1
    • reparameterize 함수는 z 분포를 이용하여 z 백터를 샘플링
    • z는 가우시안 분포라고 가정했기 때문에 평균과 표준편차를 이용하여 z에 epsilon(노이즈)를 적용하여 샘플링
    • 샘플링한 z 벡터를 디코더에 다시 통과시켜서 입력과 동일한 데이터를 만들어 내는 작업
  • 설명 2
    • 손실(오차)을 구하는 함수
  • 설명 3
    • GradientTape는 미분을 자동으로 계산
    • 미분을 자동 계산해서 동적으로 기울기 값들을 확인
    • tape.gradient는 self.trainable_variables에 대한 손실의 미분 값

### 인코더와 디코더 네트워크 생성

encoder = [
    tf.keras.layers.InputLayer(input_shape=(28,28,1)),
    tf.keras.layers.Conv2D(
        filters=32, kernel_size=3, strides=(2, 2), activation="relu"
    ),
    tf.keras.layers.Conv2D(
        filters=64, kernel_size=3, strides=(2, 2), activation="relu"
    ),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units=2*2),
] # 설명 1

decoder = [
    tf.keras.layers.Dense(units=7 * 7 * 64, activation="relu"),
    tf.keras.layers.Reshape(target_shape=(7, 7, 64)),
    tf.keras.layers.Conv2DTranspose(
        filters=64, kernel_size=3, strides=(2, 2), padding="SAME", activation="relu"
    ),
    tf.keras.layers.Conv2DTranspose(
        filters=32, kernel_size=3, strides=(2, 2), padding="SAME", activation="relu"
    ),
    tf.keras.layers.Conv2DTranspose(
        filters=1, kernel_size=3, strides=(1, 1), padding="SAME", activation="sigmoid"
    ),
] # 설명 2

상세 설명

  • 설명 1
    • 인코더 역할은 데이터가 주어졌을 때 디코더가 원래 데이터롤 잘 복원할 수 있는 z를 샘플링하여 이상적인 확률 분포를 찾음
    • 변형 오토인코더에서는 이상적인 확률 분포를 찾는 데 변분추론을 사용 
  • 설명 2
    • 디코더는 추출한 샘플을 입력으로 받아 다시 원본 데이터와 유사한 데이터를 재구축

### 모델 정의

model = VAE(
    enc = encoder,
    dec = decoder,
    optimizer = tf.keras.optimizers.Adam(1e-3)
)

### 모델 훈련 결과 시각화

example_data = next(iter(test_dataset))
def plot_reconstruction(model, example_data, nex=8, zm=2):

    example_data_reconstructed = model.reconstruct(example_data)
    samples = model.decode(tf.random.normal(shape=(BATCH_SIZE, 2)))
    fig, axs = plt.subplots(ncols=nex, nrows=3, figsize=(zm * nex, zm * 3))
    for axi, (dat, lab) in enumerate(
        zip(
            [example_data, example_data_reconstructed, samples],
            ["data", "data recon", "samples"],
        )
    ):
        for ex in range(nex):
            axs[axi, ex].matshow(
                dat.numpy()[ex].squeeze(), cmap=plt.cm.Greys, vmin=0, vmax=1
            )
            axs[axi, ex].axes.get_xaxis().set_ticks([])
            axs[axi, ex].axes.get_yaxis().set_ticks([])
        axs[axi, 0].set_ylabel(lab)
    plt.show()

### 손실 정보 저장

losses = pd.DataFrame(columns = ['recon_loss', 'latent_loss'])  # 손실/오차 정보를 데이터프레임에 저장

### 모델 훈련

n_epochs = 50

for epoch in range(n_epochs):
    for batch, train_x in tqdm(
        zip(range(N_TRAIN_BATCHES), train_dataset), total=N_TRAIN_BATCHES):
        model.train(train_x)  # 훈련 데이터셋을 사용하여 훈련
        loss = []
        
    for batch, test_x in tqdm(
        zip(range(N_TEST_BATCHES), test_dataset), total=N_TEST_BATCHES
): 
        loss.append(model.loss_function(train_x))
    losses.loc[len(losses)] = np.mean(loss, axis=0) 
    display.clear_output()
    print(
        "Epoch: {} | recon_loss: {} | latent_loss: {}".format(
            epoch, losses.recon_loss.values[-1], losses.latent_loss.values[-1]
        ) # 재구성 오차(reconstruction_loss)와 인코더-디코더 사이의 오차(latent_loss) 출력 
  )
    
    
    plot_reconstruction(model, example_data)

모델 훈련 결과

 

참고: 서지영, 『딥러닝 텐서플로 교과서』, 길벗(2022)

'딥러닝_개념' 카테고리의 다른 글

NLP_허깅페이스 트랜스포머  (0) 2023.02.08
생성모델_3.GAN  (1) 2023.02.07
생성모델_1.오토인코더  (0) 2023.02.03
클러스터링_3.자기 조직화 지도  (0) 2023.01.31
클러스터링_2.가우시안 혼합 모델  (0) 2023.01.30