コードと生成画像をアップします
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Multiply, Embedding, Conv2DTranspose
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, MaxPooling2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model, load_model
from keras.preprocessing.image import load_img, img_to_array
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import sys, os
import numpy as np
import pandas as pd
import pydicom
from PIL import Image
from keras.utils import Progbar
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, MaxPooling2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model, load_model
from keras.preprocessing.image import load_img, img_to_array
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import sys, os
import numpy as np
import pandas as pd
import pydicom
from PIL import Image
from keras.utils import Progbar
from sklearn.model_selection import train_test_split
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
for k in range(len(physical_devices)):
tf.config.experimental.set_memory_growth(physical_devices[k], True)
print('memory growth:', tf.config.experimental.get_memory_growth(physical_devices[k]))
else:
print("Not enough GPU hardware devices available")
if len(physical_devices) > 0:
for k in range(len(physical_devices)):
tf.config.experimental.set_memory_growth(physical_devices[k], True)
print('memory growth:', tf.config.experimental.get_memory_growth(physical_devices[k]))
else:
print("Not enough GPU hardware devices available")
import pandas as pd
import pydicom
from PIL import Image
import pydicom
from PIL import Image
h = 64
w = 64
ch = 1
trainlist = pd.read_csv("./label.csv", header=0)
train_data = np.zeros((len(trainlist["image"]), h, w, ch))
for i in range(0, len(trainlist["image"])):
img = Image.open(trainlist["image"][i])
img = img.convert("L")
img_resize = img.resize((h,w), Image.LANCZOS)
tmp2 = img_to_array(img_resize)
train_data[i] = tmp2.reshape((h,w,ch))/127.5 -1.0
w = 64
ch = 1
trainlist = pd.read_csv("./label.csv", header=0)
train_data = np.zeros((len(trainlist["image"]), h, w, ch))
for i in range(0, len(trainlist["image"])):
img = Image.open(trainlist["image"][i])
img = img.convert("L")
img_resize = img.resize((h,w), Image.LANCZOS)
tmp2 = img_to_array(img_resize)
train_data[i] = tmp2.reshape((h,w,ch))/127.5 -1.0
x1 = train_data
trainlist2 = pd.read_csv("./label.csv", header=0)
train_label = trainlist2["label"]
train_label = train_label.values
train_label = trainlist2["label"]
train_label = train_label.values
y1 = train_label
(x_train, x_test, y_train, y_test) = train_test_split(x1, y1, test_size=0.5)
# Generator
class Generator(object):
def __init__(self, input_dim, image_shape):
INITIAL_CHANNELS = 128
INITIAL_SIZE = 8
inputs = Input((input_dim,))
fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
fc1 = BatchNormalization()(fc1)
fc1 = LeakyReLU(0.2)(fc1)
fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS), input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
conv1 = Conv2D(64, (3, 3), padding='same')(up1)
conv1 = BatchNormalization()(conv1)
conv1 = Activation('relu')(conv1)
up2 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv1)
conv2 = Conv2D(64, (3, 3), padding='same')(up2)
conv2 = BatchNormalization()(conv2)
conv2 = Activation('relu')(conv2)
up3 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv2)
conv3 = Conv2D(image_shape[2], (5, 5), padding='same')(up3)
outputs = Activation('tanh')(conv3)
self.model = Model(inputs=[inputs], outputs=[outputs])
def get_model(self):
return self.model
# Discriminator
class Discriminator(object):
def __init__(self, input_shape):
inputs = Input(input_shape)
conv1 = Conv2D(128, (5, 5), padding='same')(inputs)
conv1 = LeakyReLU(0.2)(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = Conv2D(256, (5, 5), padding='same')(pool1)
conv2 = LeakyReLU(0.2)(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = Conv2D(512, (5, 5), padding='same')(pool2)
conv3 = LeakyReLU(0.2)(conv3)
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
fc1 = Flatten()(pool3)
fc1 = Dense(1)(fc1)
outputs = Activation('sigmoid')(fc1)
self.model = Model(inputs=[inputs], outputs=[outputs])
def get_model(self):
return self.model
class Generator(object):
def __init__(self, input_dim, image_shape):
INITIAL_CHANNELS = 128
INITIAL_SIZE = 8
inputs = Input((input_dim,))
fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
fc1 = BatchNormalization()(fc1)
fc1 = LeakyReLU(0.2)(fc1)
fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS), input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
conv1 = Conv2D(64, (3, 3), padding='same')(up1)
conv1 = BatchNormalization()(conv1)
conv1 = Activation('relu')(conv1)
up2 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv1)
conv2 = Conv2D(64, (3, 3), padding='same')(up2)
conv2 = BatchNormalization()(conv2)
conv2 = Activation('relu')(conv2)
up3 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv2)
conv3 = Conv2D(image_shape[2], (5, 5), padding='same')(up3)
outputs = Activation('tanh')(conv3)
self.model = Model(inputs=[inputs], outputs=[outputs])
def get_model(self):
return self.model
# Discriminator
class Discriminator(object):
def __init__(self, input_shape):
inputs = Input(input_shape)
conv1 = Conv2D(128, (5, 5), padding='same')(inputs)
conv1 = LeakyReLU(0.2)(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = Conv2D(256, (5, 5), padding='same')(pool1)
conv2 = LeakyReLU(0.2)(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = Conv2D(512, (5, 5), padding='same')(pool2)
conv3 = LeakyReLU(0.2)(conv3)
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
fc1 = Flatten()(pool3)
fc1 = Dense(1)(fc1)
outputs = Activation('sigmoid')(fc1)
self.model = Model(inputs=[inputs], outputs=[outputs])
def get_model(self):
return self.model
# (連結した)DCGANの作成(ここで pip install keras-progbar でProgbarをインストールしておく)
class DCGAN(object):
def __init__(self, input_dim, image_shape):
self.input_dim = input_dim
self.d = Discriminator(image_shape).get_model()
self.g = Generator(input_dim, image_shape).get_model()
def compile(self, g_optim, d_optim):
self.d.trainable = False # Discriminator単体の学習はFalse
self.dcgan = Sequential([self.g, self.d])
self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
self.d.trainable = True # DCGANとしてのDiscriminatorの学習はTrue
self.d.compile(loss='binary_crossentropy', optimizer=d_optim)
def train(self, epochs, batch_size, X_train):
g_losses = []
d_losses = []
for epoch in range(epochs):
np.random.shuffle(X_train)
n_iter = X_train.shape[0] // batch_size
progress_bar = Progbar(target=n_iter)
for index in range(n_iter):
# ノイズ作成 -> N latent vectors
noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))
# 読み込み(real data と generate fake data)
image_batch = X_train[index * batch_size:(index + 1) * batch_size]
for i in range(batch_size):
if np.random.random() > 0.5:
image_batch[i] = np.fliplr(image_batch[i]) # fliplr:左右反転
if np.random.random() <= 0.5:
image_batch[i] = np.flipud(image_batch[i]) # flipud:上下反転 flip:左右上下反転
generated_images = self.g.predict(noise, verbose=0)
gen_imgs = generated_images * 127.5 + 127.5
fig, ax = plt.subplots(2,5, figsize=(15,6))
n= 0
class DCGAN(object):
def __init__(self, input_dim, image_shape):
self.input_dim = input_dim
self.d = Discriminator(image_shape).get_model()
self.g = Generator(input_dim, image_shape).get_model()
def compile(self, g_optim, d_optim):
self.d.trainable = False # Discriminator単体の学習はFalse
self.dcgan = Sequential([self.g, self.d])
self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
self.d.trainable = True # DCGANとしてのDiscriminatorの学習はTrue
self.d.compile(loss='binary_crossentropy', optimizer=d_optim)
def train(self, epochs, batch_size, X_train):
g_losses = []
d_losses = []
for epoch in range(epochs):
np.random.shuffle(X_train)
n_iter = X_train.shape[0] // batch_size
progress_bar = Progbar(target=n_iter)
for index in range(n_iter):
# ノイズ作成 -> N latent vectors
noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))
# 読み込み(real data と generate fake data)
image_batch = X_train[index * batch_size:(index + 1) * batch_size]
for i in range(batch_size):
if np.random.random() > 0.5:
image_batch[i] = np.fliplr(image_batch[i]) # fliplr:左右反転
if np.random.random() <= 0.5:
image_batch[i] = np.flipud(image_batch[i]) # flipud:上下反転 flip:左右上下反転
generated_images = self.g.predict(noise, verbose=0)
gen_imgs = generated_images * 127.5 + 127.5
fig, ax = plt.subplots(2,5, figsize=(15,6))
n= 0
for i in range(0,2):
for j in range(0,5):
ax[i, j].imshow(gen_imgs[n].reshape(h, w), cmap='gray')
ax[i, j].axis('off')
n += 1
fig.savefig('./generated_images/gen_%02d.png' % epoch)
generator.save('./weights/mnist_gen_ep%02d.h5' % epoch)
discriminator.save('./weights/mnist_disc_ep%02d.h5' % epoch)
# discriminator学習用のラベルの準備
X = np.concatenate((image_batch, generated_images))
y = np.array([1] * batch_size + [0] * batch_size)
# discriminatorの学習時のロス
d_loss = self.d.train_on_batch(X, y)
# generatorの学習時のロス
g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))
progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
g_losses.append(g_loss)
d_losses.append(d_loss)
if (epoch+1)%10 == 0:
image = self.combine_images(generated_images)
image = (image + 1) / 2.0 * 255.0
cv2.imwrite('./result/' + str(epoch) + ".png", image)
print('\nEpoch' + str(epoch) + " end")
# epochごとに重みを保存
if (epoch+1)%50 == 0:
self.g.save_weights('weights/gen_' + str(epoch) + '.h5', True)
self.d.save_weights('weights/disc_' + str(epoch) + '.h5', True)
return g_losses, d_losses
def load_weights(self, g_weight, d_weight):
self.g.load_weights(g_weight)
self.d.load_weights(d_weight)
def combine_images(self, generated_images):
num = generated_images.shape[0]
width = int(math.sqrt(num))
height = int(math.ceil(float(num) / width))
shape = generated_images.shape[1:4]
image = np.zeros((height * shape[0], width * shape[1], shape[2]),
dtype=generated_images.dtype)
for index, img in enumerate(generated_images):
i = int(index / width)
j = index % width
image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1], :] = img[:, :, :]
return image
for j in range(0,5):
ax[i, j].imshow(gen_imgs[n].reshape(h, w), cmap='gray')
ax[i, j].axis('off')
n += 1
fig.savefig('./generated_images/gen_%02d.png' % epoch)
generator.save('./weights/mnist_gen_ep%02d.h5' % epoch)
discriminator.save('./weights/mnist_disc_ep%02d.h5' % epoch)
# discriminator学習用のラベルの準備
X = np.concatenate((image_batch, generated_images))
y = np.array([1] * batch_size + [0] * batch_size)
# discriminatorの学習時のロス
d_loss = self.d.train_on_batch(X, y)
# generatorの学習時のロス
g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))
progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
g_losses.append(g_loss)
d_losses.append(d_loss)
if (epoch+1)%10 == 0:
image = self.combine_images(generated_images)
image = (image + 1) / 2.0 * 255.0
cv2.imwrite('./result/' + str(epoch) + ".png", image)
print('\nEpoch' + str(epoch) + " end")
# epochごとに重みを保存
if (epoch+1)%50 == 0:
self.g.save_weights('weights/gen_' + str(epoch) + '.h5', True)
self.d.save_weights('weights/disc_' + str(epoch) + '.h5', True)
return g_losses, d_losses
def load_weights(self, g_weight, d_weight):
self.g.load_weights(g_weight)
self.d.load_weights(d_weight)
def combine_images(self, generated_images):
num = generated_images.shape[0]
width = int(math.sqrt(num))
height = int(math.ceil(float(num) / width))
shape = generated_images.shape[1:4]
image = np.zeros((height * shape[0], width * shape[1], shape[2]),
dtype=generated_images.dtype)
for index, img in enumerate(generated_images):
i = int(index / width)
j = index % width
image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1], :] = img[:, :, :]
return image
# AnoGAN(URL:https://qiita.com/NakaokaRei/items/231ec4efe42dfe79d1ff参照)
def sum_of_residual(y_true, y_pred):
return K.sum(K.abs(y_true - y_pred))
class ANOGAN(object):
def __init__(self, input_dim, g):
self.input_dim = input_dim
self.g = g
g.trainable = False
# Input layer更新されない.そのすぐ後に同じサイズ同じ形の layer を追加する
anogan_in = Input(shape=(input_dim,))
g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
g_out = g(g_in)
self.model = Model(inputs=anogan_in, outputs=g_out)
self.model_weight = None
def compile(self, optim):
self.model.compile(loss=sum_of_residual, optimizer=optim)
K.set_learning_phase(0)
def compute_anomaly_score(self, x, iterations=300):
z = np.random.uniform(-1, 1, size=(1, self.input_dim))
# 潜在的に学習を進める
loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
loss = loss.history['loss'][-1]
similar_data = self.model.predict_on_batch(z)
return loss, similar_data
def sum_of_residual(y_true, y_pred):
return K.sum(K.abs(y_true - y_pred))
class ANOGAN(object):
def __init__(self, input_dim, g):
self.input_dim = input_dim
self.g = g
g.trainable = False
# Input layer更新されない.そのすぐ後に同じサイズ同じ形の layer を追加する
anogan_in = Input(shape=(input_dim,))
g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
g_out = g(g_in)
self.model = Model(inputs=anogan_in, outputs=g_out)
self.model_weight = None
def compile(self, optim):
self.model.compile(loss=sum_of_residual, optimizer=optim)
K.set_learning_phase(0)
def compute_anomaly_score(self, x, iterations=300):
z = np.random.uniform(-1, 1, size=(1, self.input_dim))
# 潜在的に学習を進める
loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
loss = loss.history['loss'][-1]
similar_data = self.model.predict_on_batch(z)
return loss, similar_data
import tensorflow as tf
import math
import cv2
if __name__ == '__main__':
batch_size = 10
epochs = 3000
input_dim = 15
g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
# 学習データの作成
x_train_1 = []
for i in range(len(x_train)):
if y_train[i] == 1:
x_train_1.append(x_train[i].reshape((h, w, ch)))
x_train_1 = np.array(x_train_1)
print("train data:",len(x_train_1))
# 評価データの作成(n = 100)
cnt = 0
x_test_0, y = [], []
for i in range(len(x_train)):
if y_train[i] == 0 or y_train[i] == 1:
x_test_0.append(x_test[i].reshape((h, w, ch)))
y.append(y_test[i])
cnt +=1
if cnt == 100:
break
x_test_0 = np.array(x_test_0)
print("test_data:",len(x_test_0))
input_shape = x_train_1[0].shape
X_test_original = x_test_0.copy()
# generator と discriminator の学習
dcgan = DCGAN(input_dim, input_shape)
dcgan.compile(g_optim, d_optim)
g_losses, d_losses = dcgan.train(epochs, batch_size, x_train_1)
with open('loss_moi.csv', 'w') as f:
for g_loss, d_loss in zip(g_losses, d_losses):
f.write(str(g_loss) + ',' + str(d_loss) + '\n')