まだ(学習の)途中ですが | python3Xのブログ

python3Xのブログ

ここでは40代、50代の方が日々の生活で役に立つ情報や私の趣味であるプログラム、Excelや科学に関する内容で投稿する予定です。

コードと生成画像をアップします

from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Multiply, Embedding, Conv2DTranspose
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, MaxPooling2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model, load_model
from keras.preprocessing.image import load_img, img_to_array
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import sys, os
import numpy as np
import pandas as pd
import pydicom
from PIL import Image
from keras.utils import Progbar
from sklearn.model_selection import train_test_split
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    for k in range(len(physical_devices)):
        tf.config.experimental.set_memory_growth(physical_devices[k], True)
        print('memory growth:', tf.config.experimental.get_memory_growth(physical_devices[k]))
else:
    print("Not enough GPU hardware devices available")
 
import pandas as pd
import pydicom
from PIL import Image
h = 64
w = 64
ch = 1
trainlist = pd.read_csv("./label.csv", header=0)
train_data = np.zeros((len(trainlist["image"]), h, w, ch))
for i in range(0, len(trainlist["image"])):
    img = Image.open(trainlist["image"][i])
    img = img.convert("L")   
    img_resize = img.resize((h,w), Image.LANCZOS)
    tmp2 = img_to_array(img_resize)
    train_data[i] = tmp2.reshape((h,w,ch))/127.5 -1.0
 
x1 = train_data
trainlist2 = pd.read_csv("./label.csv", header=0)
train_label = trainlist2["label"]
train_label = train_label.values
y1 = train_label
 
(x_train, x_test, y_train, y_test) = train_test_split(x1, y1, test_size=0.5)
 
# Generator
class Generator(object):
    def __init__(self, input_dim, image_shape):
        INITIAL_CHANNELS = 128
        INITIAL_SIZE = 8
 
        inputs = Input((input_dim,))
        fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS), input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = Activation('relu')(conv1)
        up2 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(64, (3, 3), padding='same')(up2)
        conv2 = BatchNormalization()(conv2)
        conv2 = Activation('relu')(conv2)       
        up3 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv2)       
        conv3 = Conv2D(image_shape[2], (5, 5), padding='same')(up3)
        outputs = Activation('tanh')(conv3)
 
        self.model = Model(inputs=[inputs], outputs=[outputs])
 
    def get_model(self):
        return self.model
# Discriminator
class Discriminator(object):
    def __init__(self, input_shape):
        inputs = Input(input_shape)
        conv1 = Conv2D(128, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(256, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        conv3 = Conv2D(512, (5, 5), padding='same')(pool2)
        conv3 = LeakyReLU(0.2)(conv3)
        pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)   
  
        fc1 = Flatten()(pool3)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)
 
        self.model = Model(inputs=[inputs], outputs=[outputs])
   
    def get_model(self):
        return self.model
 
# (連結した)DCGANの作成(ここで pip install keras-progbar でProgbarをインストールしておく)
class DCGAN(object):
    def __init__(self, input_dim, image_shape):
        self.input_dim = input_dim
        self.d = Discriminator(image_shape).get_model()
        self.g = Generator(input_dim, image_shape).get_model()
   
    def compile(self, g_optim, d_optim):
        self.d.trainable = False                # Discriminator単体の学習はFalse
        self.dcgan = Sequential([self.g, self.d])
        self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.d.trainable = True                # DCGANとしてのDiscriminatorの学習はTrue
        self.d.compile(loss='binary_crossentropy', optimizer=d_optim)
   
    def train(self, epochs, batch_size, X_train):
        g_losses = []
        d_losses = []
        for epoch in range(epochs):
            np.random.shuffle(X_train)
            n_iter = X_train.shape[0] // batch_size
            progress_bar = Progbar(target=n_iter)
            for index in range(n_iter):
                # ノイズ作成 -> N latent vectors
                noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))
 
                # 読み込み(real data と generate fake data)
                image_batch = X_train[index * batch_size:(index + 1) * batch_size]
                for i in range(batch_size):
                    if np.random.random() > 0.5:
                        image_batch[i] = np.fliplr(image_batch[i])  # fliplr:左右反転
                    if np.random.random() <= 0.5:
                        image_batch[i] = np.flipud(image_batch[i])  # flipud:上下反転 flip:左右上下反転
                generated_images = self.g.predict(noise, verbose=0)
                gen_imgs = generated_images * 127.5 + 127.5
       
                fig, ax = plt.subplots(2,5, figsize=(15,6))
                n= 0
                for i in range(0,2):
                    for j in range(0,5):
                        ax[i, j].imshow(gen_imgs[n].reshape(h, w), cmap='gray')
                        ax[i, j].axis('off')
                        n += 1
                fig.savefig('./generated_images/gen_%02d.png' % epoch)
                generator.save('./weights/mnist_gen_ep%02d.h5' % epoch)
                discriminator.save('./weights/mnist_disc_ep%02d.h5' % epoch)
                # discriminator学習用のラベルの準備
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * batch_size + [0] * batch_size)
 
                # discriminatorの学習時のロス
                d_loss = self.d.train_on_batch(X, y)
 
                # generatorの学習時のロス
                g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))
 
                progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
            g_losses.append(g_loss)
            d_losses.append(d_loss)
            if (epoch+1)%10 == 0:
                image = self.combine_images(generated_images)
                image = (image + 1) / 2.0 * 255.0
                cv2.imwrite('./result/' + str(epoch) + ".png", image)
            print('\nEpoch' + str(epoch) + " end")
 
            # epochごとに重みを保存
            if (epoch+1)%50 == 0:
                self.g.save_weights('weights/gen_' + str(epoch) + '.h5', True)
                self.d.save_weights('weights/disc_' + str(epoch) + '.h5', True)
        return g_losses, d_losses
 
    def load_weights(self, g_weight, d_weight):
        self.g.load_weights(g_weight)
        self.d.load_weights(d_weight)
 
    def combine_images(self, generated_images):
        num = generated_images.shape[0]
        width = int(math.sqrt(num))
        height = int(math.ceil(float(num) / width))
        shape = generated_images.shape[1:4]
        image = np.zeros((height * shape[0], width * shape[1], shape[2]),
                         dtype=generated_images.dtype)
        for index, img in enumerate(generated_images):
            i = int(index / width)
            j = index % width
            image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1], :] = img[:, :, :]
        return image
 
# AnoGAN(URL:https://qiita.com/NakaokaRei/items/231ec4efe42dfe79d1ff参照)
def sum_of_residual(y_true, y_pred):
    return K.sum(K.abs(y_true - y_pred))
 
class ANOGAN(object):
    def __init__(self, input_dim, g):
        self.input_dim = input_dim
        self.g = g
        g.trainable = False
        # Input layer更新されない.そのすぐ後に同じサイズ同じ形の layer を追加する
        anogan_in = Input(shape=(input_dim,))
        g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
        g_out = g(g_in)
        self.model = Model(inputs=anogan_in, outputs=g_out)
        self.model_weight = None
 
    def compile(self, optim):
        self.model.compile(loss=sum_of_residual, optimizer=optim)
        K.set_learning_phase(0)
 
    def compute_anomaly_score(self, x, iterations=300):
        z = np.random.uniform(-1, 1, size=(1, self.input_dim))
 
        # 潜在的に学習を進める
        loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
        loss = loss.history['loss'][-1]
        similar_data = self.model.predict_on_batch(z)
 
        return loss, similar_data
 
import tensorflow as tf

import math
import cv2
if __name__ == '__main__':
    batch_size = 10
    epochs = 3000  
    input_dim = 15
    g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
    d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
   
    # 学習データの作成
    x_train_1 = []
    for i in range(len(x_train)):
        if y_train[i] == 1:
            x_train_1.append(x_train[i].reshape((h, w, ch)))
    x_train_1 = np.array(x_train_1)
    print("train data:",len(x_train_1))
 
    # 評価データの作成(n = 100)
    cnt = 0
    x_test_0, y = [], []
    for i in range(len(x_train)):
        if y_train[i] == 0 or y_train[i] == 1: 
            x_test_0.append(x_test[i].reshape((h, w, ch)))
            y.append(y_test[i])
            cnt +=1
            if cnt == 100:
                break
                 
    x_test_0 = np.array(x_test_0)
    print("test_data:",len(x_test_0))
   
    input_shape = x_train_1[0].shape 
    X_test_original = x_test_0.copy()   
   
    # generator と discriminator の学習
    dcgan = DCGAN(input_dim, input_shape)
    dcgan.compile(g_optim, d_optim)
    g_losses, d_losses = dcgan.train(epochs, batch_size, x_train_1) 
    with open('loss_moi.csv', 'w') as f:
        for g_loss, d_loss in zip(g_losses, d_losses):
            f.write(str(g_loss) + ',' + str(d_loss) + '\n')