改めてGANを試してみました | python3Xのブログ

python3Xのブログ

ここでは40代、50代の方が日々の生活で役に立つ情報や私の趣味であるプログラム、Excelや科学に関する内容で投稿する予定です。

残念ながら、サイズを256にするとGPUに関するエラーが出てしまいます

今回はその1/2 x 1/2 の128 x 128 のサイズで画像生成を行っています

下に画像とコードをアップしました(ちょっと長いですが)

 

今までもそうでしたが、Python、Tensorflow その他も含めて

バージョンが変わる度に今までのプログラムが使えない

そういうことの繰り返しでした

今回もTensorflow2.0になって大きな変化が起きてしまいました

また、今回はPython3.6では上手くいくことが

Python3.7では使えないということもあり

結構遠回りしてしまいました

 

ただ、今回の書籍のプログラムをGoogle Colabで動かしてみましたが

生成画像は変わらずにヒドイものでした

 

生成画像

損失

コード
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
tf.enable_eager_execution()
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Multiply, Embedding, Conv2DTranspose
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, MaxPooling2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model, load_model
from keras.preprocessing.image import load_img, img_to_array
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import sys, os
import numpy as np
import pandas as pd
import pydicom
from PIL import Image
from keras.utils import Progbar
from sklearn.model_selection import train_test_split
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    # Restrict TensorFlow to only allocate 6GB of memory on the first GPU
    try:
        tf.config.experimental.set_virtual_device_configuration(
        gpus[0],
        [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=6144)])
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Virtual devices must be set before GPUs have been initialized
        print(e)
import pandas as pd
import pydicom
from PIL import Image
h = 128
w = 128
ch = 1
trainlist = pd.read_csv("./label.csv", header=0)
train_data = np.zeros((len(trainlist["image"]), h, w, ch))
for i in range(0, len(trainlist["image"])):
    img = Image.open(trainlist["image"][i])
    img = img.convert("L")   
    img_resize = img.resize((h,w), Image.LANCZOS)
    tmp2 = img_to_array(img_resize)
    train_data[i] = tmp2.reshape((h,w,ch))/127.5 -1.0
x1 = train_data
x1.shape
trainlist2 = pd.read_csv("./label.csv", header=0)
train_label = trainlist2["label"]
train_label = train_label.values
y1 = train_label
y1.shape
(x_train, x_test, y_train, y_test) = train_test_split(x1, y1, test_size=0.5)
 
# Generator
class Generator(object):
    def __init__(self, input_dim, image_shape):
        INITIAL_CHANNELS = 128
        INITIAL_SIZE = 16
 
        inputs = Input((input_dim,))
        fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS), input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = LeakyReLU(0.2)(conv1)
        up2 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(64, (3, 3), padding='same')(up2)
        conv2 = BatchNormalization()(conv2)
        conv2 = LeakyReLU(0.2)(conv2)       
        up3 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv2)       
        conv3 = Conv2D(image_shape[2], (5, 5), padding='same')(up3)
        outputs = Activation('tanh')(conv3)
 
        self.model = Model(inputs=[inputs], outputs=[outputs])
 
    def get_model(self):
        return self.model
 
# Discriminator
class Discriminator(object):
    def __init__(self, input_shape):
        inputs = Input(input_shape)
        conv1 = Conv2D(128, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(256, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        conv3 = Conv2D(512, (5, 5), padding='same')(pool2)
        conv3 = LeakyReLU(0.2)(conv3)
        pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)   
  
        fc1 = Flatten()(pool3)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)
 
        self.model = Model(inputs=[inputs], outputs=[outputs])
   
    def get_model(self):
        return self.model
 
g = Generator(15, (64, 64, 1))
generator = g.get_model()
print(generator.summary())
d = Discriminator((64, 64, 1),)
discriminator = d.get_model()
print(discriminator.summary())

# (連結した)DCGANの作成(ここで pip install keras-progbar でProgbarをインストールしておく)
class DCGAN(object):
    def __init__(self, input_dim, image_shape):
        self.input_dim = input_dim
        self.d = Discriminator(image_shape).get_model()
        self.g = Generator(input_dim, image_shape).get_model()
   
    def compile(self, g_optim, d_optim):
        self.d.trainable = False                # Discriminator単体の学習はFalse
        self.dcgan = Sequential([self.g, self.d])
        self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.d.trainable = True                # DCGANとしてのDiscriminatorの学習はTrue
        self.d.compile(loss='binary_crossentropy', optimizer=d_optim)
   
    def train(self, epochs, batch_size, X_train):
        g_losses = []
        d_losses = []
        for epoch in range(epochs):
            np.random.shuffle(X_train)
            n_iter = X_train.shape[0] // batch_size
            progress_bar = Progbar(target=n_iter)
            for index in range(n_iter):
                # ノイズ作成 -> N latent vectors
                noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))
 
                # 読み込み(real data と generate fake data)
                image_batch = X_train[index * batch_size:(index + 1) * batch_size]
                for i in range(batch_size):
                    if np.random.random() > 0.5:
                        image_batch[i] = np.fliplr(image_batch[i])  # fliplr:左右反転
                    if np.random.random() <= 0.5:
                        image_batch[i] = np.flipud(image_batch[i])  # flipud:上下反転 flip:左右上下反転
                generated_images = self.g.predict(noise, verbose=0)
                gen_imgs = generated_images * 127.5 + 127.5
       
                fig, ax = plt.subplots(2,5, figsize=(15,6))
                n= 0
                for i in range(0,2):
                    for j in range(0,5):
                        ax[i, j].imshow(gen_imgs[n].reshape(h, w), cmap='gray')
                        ax[i, j].axis('off')
                        n += 1
                fig.savefig('./generated_images/gen_%02d.png' % epoch)
                generator.save('./weights/mnist_gen_ep%02d.h5' % epoch)
                discriminator.save('./weights/mnist_disc_ep%02d.h5' % epoch)
                # discriminator学習用のラベルの準備
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * batch_size + [0] * batch_size)
 
                # discriminatorの学習時のロス
                d_loss = self.d.train_on_batch(X, y)
                # generatorの学習時のロス
                g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))
 
                progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
            g_losses.append(g_loss)
            d_losses.append(d_loss)
            if (epoch+1)%50 == 0:
                image = self.combine_images(generated_images)
                image = (image + 1) / 2.0 * 255.0
                cv2.imwrite('./result/' + str(epoch) + ".png", image)
            print('\nEpoch' + str(epoch) + " end")
 
            # epochごとに重みを保存
            if (epoch+1)%50 == 0:
                self.g.save_weights('weights/gen_' + str(epoch) + '.h5', True)
                self.d.save_weights('weights/disc_' + str(epoch) + '.h5', True)
        return g_losses, d_losses
 
    def load_weights(self, g_weight, d_weight):
        self.g.load_weights(g_weight)
        self.d.load_weights(d_weight)
 
    def combine_images(self, generated_images):
        num = generated_images.shape[0]
        width = int(math.sqrt(num))
        height = int(math.ceil(float(num) / width))
        shape = generated_images.shape[1:4]
        image = np.zeros((height * shape[0], width * shape[1], shape[2]),
                         dtype=generated_images.dtype)
        for index, img in enumerate(generated_images):
            i = int(index / width)
            j = index % width
            image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1], :] = img[:, :, :]
        return image
 
# AnoGAN(URL:https://qiita.com/NakaokaRei/items/231ec4efe42dfe79d1ff参照)
def sum_of_residual(y_true, y_pred):
    return K.sum(K.abs(y_true - y_pred))
 
class ANOGAN(object):
    def __init__(self, input_dim, g):
        self.input_dim = input_dim
        self.g = g
        g.trainable = False
        # Input layer更新されない.そのすぐ後に同じサイズ同じ形の layer を追加する
        anogan_in = Input(shape=(input_dim,))
        g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
        g_out = g(g_in)
        self.model = Model(inputs=anogan_in, outputs=g_out)
        self.model_weight = None
 
    def compile(self, optim):
        self.model.compile(loss=sum_of_residual, optimizer=optim)
        K.set_learning_phase(0)
 
    def compute_anomaly_score(self, x, iterations=300):
        z = np.random.uniform(-1, 1, size=(1, self.input_dim))
 
        # 潜在的に学習を進める
        loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
        loss = loss.history['loss'][-1]
        similar_data = self.model.predict_on_batch(z)
 
        return loss, similar_data
import math
import cv2
if __name__ == '__main__':
    batch_size = 32
    epochs = 1000  
    input_dim = 15
    g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
    d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
   
    # 学習データの作成
    x_train_1 = []
    for i in range(len(x_train)):
        if y_train[i] == 1:
            x_train_1.append(x_train[i].reshape((h, w, ch)))
    x_train_1 = np.array(x_train_1)
    print("train data:",len(x_train_1))
 
    # 評価データの作成(n = 100)
    cnt = 0
    x_test_0, y = [], []
    for i in range(len(x_train)):
        if y_train[i] == 0 or y_train[i] == 1: 
            x_test_0.append(x_test[i].reshape((h, w, ch)))
            y.append(y_test[i])
            cnt +=1
            if cnt == 100:
                break
                 
    x_test_0 = np.array(x_test_0)
    print("test_data:",len(x_test_0))
   
    input_shape = x_train_1[0].shape 
    X_test_original = x_test_0.copy()   
   
    # generator と discriminator の学習
    dcgan = DCGAN(input_dim, input_shape)
    dcgan.compile(g_optim, d_optim)
    g_losses, d_losses = dcgan.train(epochs, batch_size, x_train_1) 
    with open('loss_moi.csv', 'w') as f:
        for g_loss, d_loss in zip(g_losses, d_losses):
            f.write(str(g_loss) + ',' + str(d_loss) + '\n')