不过这个gan生成的是一维数据,对于图片数据可能需要对代码进行一些改变
import numpy as np
import pandas as pd
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import StratifiedKFold
torch.manual_seed(1)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings("ignore")
import os,sys
def train_model_save_gen(data, ITERS = 600, iter_ctrl=200, use_cuda = False, name_save='', file_name='./save_gen/'):
if not os.path.exists(file_name):
os.mkdir(file_name)
if not os.path.exists('./model_file/'):
os.mkdir('./model_file/')
FIXED_GENERATOR = False
LAMBDA = .1
CRITIC_ITERS = 5
CRITIC_ITERG = 1
BATCH_SIZE = len(data)
class Generator(nn.Module):
def __init__(self, shape1):
super(Generator, self).__init__()
main = nn.Sequential(
nn.Linear(shape1, 1024),
nn.ReLU(True),
nn.Linear(1024, 512),
nn.ReLU(True),
nn.Linear(512, 256),
nn.ReLU(True),
nn.Linear(256, 512),
nn.ReLU(True),
nn.Linear(512, 1024),
nn.Tanh(),
nn.Linear(1024, shape1),
)
self.main = main
def forward(self, noise, real_data):
if FIXED_GENERATOR:
return noise + real_data
else:
output = self.main(noise)
return output
class Discriminator(nn.Module):
def __init__(self, shape1):
super(Discriminator, self).__init__()
self.fc1 = nn.Linear(shape1, 512)
self.relu1 = nn.LeakyReLU(0.2)
self.fc2 = nn.Linear(512, 256)
self.relu2 = nn.LeakyReLU(0.2)
self.fc3 = nn.Linear(256, 256)
self.relu3 = nn.LeakyReLU(0.2)
self.fc4 = nn.Linear(256, 128)
self.relu4 = nn.LeakyReLU(0.2)
self.fc5 = nn.Linear(128, 1)
def forward(self, inputs):
out = self.fc1(inputs)
out = self.relu1(out)
out = self.fc2(out)
out = self.relu2(out)
out = self.fc3(out)
out = self.relu3(out)
out = self.fc4(out)
out = self.relu4(out)
out = self.fc5(out)
return out.view(-1)
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Linear') != -1:
m.weight.data.normal_(0.0, 0.02)
m.bias.data.fill_(0)
elif classname.find('BatchNorm') != -1:
m.weight.data.normal_(1.0, 0.02)
m.bias.data.fill_(0)
def calc_gradient_penalty(netD, real_data, fake_data):
alpha = torch.rand(BATCH_SIZE, 1)
alpha = alpha.expand(real_data.size())
alpha = alpha.cuda() if use_cuda else alpha
interpolates = alpha * real_data + ((1 - alpha) * fake_data)
if use_cuda:
interpolates = interpolates.cuda()
interpolates = autograd.Variable(interpolates, requires_grad=True)
disc_interpolates = netD(interpolates)
gradients = autograd.grad(outputs=disc_interpolates, inputs=interpolates,
grad_outputs=torch.ones(disc_interpolates.size()).cuda() if use_cuda else torch.ones(
disc_interpolates.size()), create_graph=True, retain_graph=True,
only_inputs=True)[0]
gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean() * LAMBDA
return gradient_penalty
netG = Generator(data.shape[1])
netD = Discriminator(data.shape[1])
netD.apply(weights_init)
netG.apply(weights_init)
if use_cuda:
netD = netD.cuda()
netG = netG.cuda()
optimizerD = optim.Adam(netD.parameters(), lr=1e-4, betas=(0.5, 0.9))
optimizerG = optim.Adam(netG.parameters(), lr=1e-4, betas=(0.5, 0.9))
one = torch.tensor(1, dtype=torch.float) ###torch.FloatTensor([1])
mone = one * -1
if use_cuda:
one = one.cuda()
mone = mone.cuda()
### ### ###
one_list = np.ones((data.shape[0]))
zero_list = np.zeros((data.shape[0]))
opt_diff_accuracy_05 = 0.5
best_item = 0
opt_accuracy = 0
all_result = []
loss_list = {'D_loss':[], 'G_loss':[]}
for iteration in range(ITERS):
sys.stdout.write(f'\r进行:{iteration}/{ITERS}') # \r 默认表示将输出的内容返回到第一个指针,这样的话,后面的内容会覆盖前面的内容。
sys.stdout.flush()
for p in netD.parameters():
p.requires_grad = True
# data = inf_train_gen('data_GAN')
real_data = torch.FloatTensor(data)
if use_cuda:
real_data = real_data.cuda()
false_data = false_data.cuda()
real_data_v = autograd.Variable(real_data)
# false_data_v = autograd.Variable(false_data)
noise = torch.randn(BATCH_SIZE, data.shape[1])
if use_cuda:
noise = noise.cuda()
noisev = autograd.Variable(noise, volatile=True)
fake = autograd.Variable(netG(noisev, real_data_v).data)
fake_output = fake.data.cpu().numpy()
data = real_data.data.cpu().numpy()
for iter_d in range(CRITIC_ITERS):
netD.zero_grad()
D_real = netD(real_data_v)
D_real = D_real.mean()
D_real.backward(mone) ##############
noise = torch.randn(BATCH_SIZE, data.shape[1])
if use_cuda:
noise = noise.cuda()
noisev = autograd.Variable(noise, volatile=True) # volatile=True相当于 requires_grad=False
fake = autograd.Variable(netG(noisev, real_data_v).data)
inputv = fake
D_fake = netD(inputv)
D_fake = D_fake.mean()
D_fake.backward(one) ################
gradient_penalty = calc_gradient_penalty(netD, real_data_v.data, fake.data)
gradient_penalty.backward() ############
D_cost = D_fake - D_real + gradient_penalty
Wasserstein_D = D_real - D_fake
loss_list['D_loss'].append(D_cost.item())
optimizerD.step()
if not FIXED_GENERATOR:
for p in netD.parameters():
p.requires_grad = False
for iter_g in range(CRITIC_ITERG):
netG.zero_grad()
real_data = torch.Tensor(data)
if use_cuda:
real_data = real_data.cuda()
real_data_v = autograd.Variable(real_data)
noise = torch.randn(BATCH_SIZE, data.shape[1])
if use_cuda:
noise = noise.cuda()
noisev = autograd.Variable(noise)
fake = netG(noisev, real_data_v)
G = netD(fake)
G = G.mean()
G.backward(mone)
G_cost = -G
loss_list['G_loss'].append(G_cost.item())
optimizerG.step()
###save generated sample features every 200 iteration
if iteration % iter_ctrl == 0:
# if iteration % 10000 == 0:
# data = shuffle(data)
# save_temp = pd.DataFrame(fake_output)
# # fake_writer = open(file_name + "/Iteration_" + str(iteration) + ".txt", "w")
# save_temp.to_csv(file_name + "/Iteration_" + str(iteration) + ".csv", index=None)
print()
print(f'循环{iteration}次..')
x = np.concatenate((data, fake_output), axis=0)
y = np.concatenate((one_list, zero_list), axis=0)
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
real_label = np.zeros((x.shape[0]))
pred_label = np.zeros((x.shape[0]))
for train_index, test_index in kfold.split(x, y):
x_train, x_test = x[train_index], x[test_index]
y_train, y_test = y[train_index], y[test_index]
knn = KNeighborsClassifier(n_neighbors=1).fit(x_train, y_train)
predicted_y = knn.predict(x_test)
pred_label[test_index] = predicted_y
real_label[test_index] = y_test
accuracy = accuracy_score(real_label, pred_label)
all_result.append(str(iteration) + "," + str(accuracy))
print(f'计算{iteration}的acc={accuracy}')
diff_accuracy_05 = abs(accuracy - 0.5)
if diff_accuracy_05 < opt_diff_accuracy_05:
opt_diff_accuracy_05 = diff_accuracy_05
best_item = iteration
opt_accuracy = accuracy
save_temp = pd.DataFrame(fake_output)
# fake_writer = open(file_name + "/Iteration_" + str(iteration) + ".txt", "w")
save_temp.to_csv(file_name + "/Iteration3_"+ str(name_save) +'_'+ str(iteration) + ".csv",index=None)
torch.save(netG.state_dict(), './model_file/netG'+str(iteration)+'.dict')
torch.save(netD.state_dict(), './model_file/netD'+str(iteration)+'.dict')
save_loss = pd.DataFrame(loss_list['G_loss'])
save_loss.to_csv(file_name + "/Gloss_" + str(iteration) + '.csv', index=None)
save_loss = pd.DataFrame(loss_list['D_loss'])
save_loss.to_csv(file_name + "/Dloss_" + str(iteration) + '.csv', index=None)
return best_item,opt_diff_accuracy_05
if __name__ == '__main__':
d = np.array(pd.read_csv('./data.txt', header=None))
train_model_save_gen(d)
在本文件夹下新建data.txt,把下列数据粘贴进去(9行,4列):
1,1,1,2
1,1,1,2
1.1,1.1,1.1,2.1
1,1,1,2
1,1,1,2
1.1,1.1,1.1,2.1
1,1,1,2
1,1,1,2
1.1,1.1,1.1,2.1
调用上述函数即可