import tensorflow as tf
from __future__import print_function
tf=tf.compat.v1
import sys
sys.path.append('game/') #相当于规定当前目录
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
GAME='bird'
ACTIONS=2
GAMMA=0.99#衰减率
OBSERVE=1000.#训练前要观察的时间步长
EXPLORE=3000000.#探索的时间步长
FINAL_EPSILON = 0.0001
INITIAL_EPSILON = 0.1
REPLAY_MEMORY = 50000
BATCH = 32
FRAME_PER_ACTION = 1
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev = 0.01)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.01, shape = shape)
return tf.Variable(initial)
def conv2d(x, W, stride):
return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")
def createNetwork():
# network weights
W_conv1 = weight_variable([8, 8, 4, 32])
b_conv1 = bias_variable([32])
W_conv2 = weight_variable([4, 4, 32, 64])
b_conv2 = bias_variable([64])
W_conv3 = weight_variable([3, 3, 64, 64])
b_conv3 = bias_variable([64])
W_fc1 = weight_variable([1600, 512])
b_fc1 = bias_variable([512])
W_fc2 = weight_variable([512, ACTIONS])
b_fc2 = bias_variable([ACTIONS])
# input layer
s = tf.placeholder("float", [None, 80, 80, 4])
# hidden layers
h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
#h_pool2 = max_pool_2x2(h_conv2)
h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)
#h_pool3 = max_pool_2x2(h_conv3)
#h_pool3_flat = tf.reshape(h_pool3, [-1, 256])
h_conv3_flat = tf.reshape(h_conv3, [-1, 1600])
h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1)
# readout layer
readout = tf.matmul(h_fc1, W_fc2) + b_fc2
return s, readout, h_fc1
返回当前状态,输出动作,全连接层
def trainNetwork(s,readout,h_fc1,sess):
#定义损失函数
#定义动作结构和Q值结构
a=tf.placeholder('float',[None,ACTIONS])
y=tf.placeholder('float',[None])
#计算动作的Q值
readout_action=tf.reduce_sum(tf.multipy(readout,a),reduction_indices=1)
cost = tf.reduce_mean(tf.square(y - readout_action))#计算动作Q值的均方误差
#调用优化器用来更新和计算模型训练和模型输出的网络参数,使其逼近或达到最优之,从而最小化或最大化损失函数
train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)
#开启game游戏
game_state=game.GameState()
##创建队列保存参数
D=deque()
#设置游戏的初始状态,并把第一步设为不执行跳跃,并将图像设置为80*80*4规格
do_nothing=np.zeros(ACTIONS)
do_nothing[0]=1
x_t,r_0,terminal=game_state.frame(do_nothing)#执行动作后的返回值
x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
#加载保存网络模型参数
#构造训练过程中的存取对象
#初始化所有参数
#读取已经保存的网络参数
saver = tf.train.Saver()
sess.run(tf.initialize_all_variables())
checkpoint = tf.train.get_checkpoint_state("saved_networks")
#开始训练,epsilon贪心策略进行训练,随机选取一个动作
epsilon=INITIAL_EPSILON
t = 0
while "flappy bird" != "angry bird":
readout_t=readout.eval(feed_dict={s:s[s_t]})[0]
a_t=np.zeros([ACTIONS])
action_index=0
if t%FRAME_PER_ACTION == 0:
#加入一些探索,探索一些相同回报的其他行为,可以提高模型的泛化能力
if random.random()<=epsilon:
action_index = random.randrange(ACTIONS)
a_t[random.randrange(ACTIONS)] = 1
else:
action_index = np.argmax(readout_t)
a_t[action_index] = 1
else:
a_t[0]=1
#模型稳定减少探索次数,减少随机动作的选取
if epsilon > FINAL_EPSILON and t > OBSERVE:
epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
#执行动作获得下一个状态的状态及回报。
x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
# 先将尺寸设置成 80 * 80,然后转换为灰度图
x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
# x_t1 新得到图像,二值化 阈值:1
ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
x_t1 = np.reshape(x_t1, (80, 80, 1))
#将状态转移过程存储到D中用于更新参数时采样
s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
D.append((s_t, a_t, r_t, s_t1, terminal))
#过了观察其开始训练模型
if len(D)>REPLAY_MEMORY:#经验池大小超过
D.popleft()
minibatch = random.sample(D, BATCH)
#从经验池中随机选取32个样本,读取当前状态、动作、以及奖励值
s_j_batch = [d[0] for d in minibatch]
a_batch = [d[1] for d in minibatch]
r_batch = [d[2] for d in minibatch]
s_j1_batch = [d[3] for d in minibatch]
y_batch = []
readout_j1_batch = readout.eval(feed_dict = {s : s_j1_batch})
for i in range(0, len(minibatch)):
terminal = minibatch[i][4]
# if terminal, only equals reward
if terminal: # 碰到障碍物,终止
y_batch.append(r_batch[i])
else:# 即时奖励 + 下一阶段回报
y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))
# 根据cost -> 梯度 -> 反向传播 -> 更新参数
# perform gradient step
# 必须要3个参数,y, a, s 只是占位符,没有初始化
# 在 train_step过程中,需要这3个参数作为变量传入
# perform gradient step
train_step.run(feed_dict = {
y : y_batch,
a : a_batch,
s : s_j_batch}
)
# update the old values
s_t = s_t1 # state 更新
t += 1
# save progress every 10000 iterations
if t % 10000 == 0:
saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step = t)
# print info
state = ""
if t <= OBSERVE:
state = "observe"
elif t > OBSERVE and t <= OBSERVE + EXPLORE:
state = "explore"
else:
state = "train"
print("TIMESTEP", t, "/ STATE", state, \
"/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \
"/ Q_MAX %e" % np.max(readout_t))
# write info to files
'''
if t % 10000 <= 100:
a_file.write(",".join([str(x) for x in readout_t]) + '\n')
h_file.write(",".join([str(x) for x in h_fc1.eval(feed_dict={s:[s_t]})[0]]) + '\n')
cv2.imwrite("logs_tetris/frame" + str(t) + ".png", x_t1)
'''
def playGame():
sess = tf.InteractiveSession()
s, readout, h_fc1 = createNetwork()
trainNetwork(s, readout, h_fc1, sess)
def main():
playGame()
if __name__ == "__main__":
main()