当前位置: 首页 > 工具软件 > 1w-flappy > 使用案例 >

强化学习flappy-bird代码阅读理解

魏安宁
2023-12-01


训练步骤:
1.定义损失函数
2.开启game
3.创建双端队列,replay memory
4.初始化游戏状态
5.加载保存网络模型参数
6.epsilon贪心策略,随机选择动作执行
7.随着游戏的进行降低epsilon,减少随机动作
8.执行动作获得下一状态回报
9.将状态转移过程存储到D中
10.更新参数时采样
11.过了观察期,训练网络模型,更新网路模型参数
12.改变状态

导入所需要的库,以及一些参数的定义

import tensorflow as tf
from __future__import print_function
tf=tf.compat.v1
import sys
sys.path.append('game/')  #相当于规定当前目录
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
GAME='bird'
ACTIONS=2
GAMMA=0.99#衰减率
OBSERVE=1000.#训练前要观察的时间步长
EXPLORE=3000000.#探索的时间步长
FINAL_EPSILON = 0.0001
INITIAL_EPSILON = 0.1
REPLAY_MEMORY = 50000
BATCH = 32
FRAME_PER_ACTION = 1

建立网络

def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev = 0.01)
    return tf.Variable(initial)
def bias_variable(shape):
    initial = tf.constant(0.01, shape = shape)
    return tf.Variable(initial)
def conv2d(x, W, stride):
    return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")
def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")
    def createNetwork():
    # network weights
    W_conv1 = weight_variable([8, 8, 4, 32])
    b_conv1 = bias_variable([32])

    W_conv2 = weight_variable([4, 4, 32, 64])
    b_conv2 = bias_variable([64])

    W_conv3 = weight_variable([3, 3, 64, 64])
    b_conv3 = bias_variable([64])

    W_fc1 = weight_variable([1600, 512])
    b_fc1 = bias_variable([512])

    W_fc2 = weight_variable([512, ACTIONS])
    b_fc2 = bias_variable([ACTIONS])

    # input layer
    s = tf.placeholder("float", [None, 80, 80, 4])

    # hidden layers
    h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)

    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
    #h_pool2 = max_pool_2x2(h_conv2)

    h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)
    #h_pool3 = max_pool_2x2(h_conv3)

    #h_pool3_flat = tf.reshape(h_pool3, [-1, 256])
    h_conv3_flat = tf.reshape(h_conv3, [-1, 1600])

    h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1)

    # readout layer
    readout = tf.matmul(h_fc1, W_fc2) + b_fc2

    return s, readout, h_fc1

返回当前状态,输出动作,全连接层

训练网络

def trainNetwork(s,readout,h_fc1,sess):
	#定义损失函数
	#定义动作结构和Q值结构
	a=tf.placeholder('float',[None,ACTIONS])
	y=tf.placeholder('float',[None])
#计算动作的Q值
	readout_action=tf.reduce_sum(tf.multipy(readout,a),reduction_indices=1)
	cost = tf.reduce_mean(tf.square(y - readout_action))#计算动作Q值的均方误差
	#调用优化器用来更新和计算模型训练和模型输出的网络参数,使其逼近或达到最优之,从而最小化或最大化损失函数
	train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)
	#开启game游戏
	game_state=game.GameState()
	##创建队列保存参数
	D=deque()
    #设置游戏的初始状态,并把第一步设为不执行跳跃,并将图像设置为80*80*4规格
    do_nothing=np.zeros(ACTIONS)
    do_nothing[0]=1
    x_t,r_0,terminal=game_state.frame(do_nothing)#执行动作后的返回值
    x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
    ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
    s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)

	#加载保存网络模型参数
    #构造训练过程中的存取对象
    #初始化所有参数
    #读取已经保存的网络参数
    saver = tf.train.Saver()
    sess.run(tf.initialize_all_variables())
    checkpoint = tf.train.get_checkpoint_state("saved_networks")



#开始训练,epsilon贪心策略进行训练,随机选取一个动作
	epsilon=INITIAL_EPSILON
	t = 0
	while "flappy bird" != "angry bird":
		readout_t=readout.eval(feed_dict={s:s[s_t]})[0]
		a_t=np.zeros([ACTIONS])
		action_index=0
		if t%FRAME_PER_ACTION == 0:
			#加入一些探索,探索一些相同回报的其他行为,可以提高模型的泛化能力
			if random.random()<=epsilon:
				action_index = random.randrange(ACTIONS)
				a_t[random.randrange(ACTIONS)] = 1
			else:
				action_index = np.argmax(readout_t)
				a_t[action_index] = 1
			else:
				a_t[0]=1
		#模型稳定减少探索次数,减少随机动作的选取
		if epsilon > FINAL_EPSILON and t > OBSERVE:
			epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

		#执行动作获得下一个状态的状态及回报。
		x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
        # 先将尺寸设置成 80 * 80,然后转换为灰度图
        x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY)
        # x_t1 新得到图像,二值化 阈值:1
        ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
        x_t1 = np.reshape(x_t1, (80, 80, 1))
		
		#将状态转移过程存储到D中用于更新参数时采样
		s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
		D.append((s_t, a_t, r_t, s_t1, terminal))
		
		#过了观察其开始训练模型
		if len(D)>REPLAY_MEMORY:#经验池大小超过
			D.popleft()
			minibatch = random.sample(D, BATCH)
			#从经验池中随机选取32个样本,读取当前状态、动作、以及奖励值
			s_j_batch = [d[0] for d in minibatch]
			a_batch = [d[1] for d in minibatch]
			r_batch = [d[2] for d in minibatch]
			s_j1_batch = [d[3] for d in minibatch]
			y_batch = []
			readout_j1_batch = readout.eval(feed_dict = {s : s_j1_batch})
       for i in range(0, len(minibatch)):
                terminal = minibatch[i][4]
                # if terminal, only equals reward
                if terminal: # 碰到障碍物,终止
                    y_batch.append(r_batch[i])
                else:# 即时奖励 + 下一阶段回报
                    y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i]))
            # 根据cost -> 梯度 -> 反向传播 -> 更新参数
            # perform gradient step
            # 必须要3个参数,y, a, s 只是占位符,没有初始化
            # 在 train_step过程中,需要这3个参数作为变量传入
            # perform gradient step
            train_step.run(feed_dict = {
                y : y_batch,
                a : a_batch,
                s : s_j_batch}
            )

        # update the old values
        s_t = s_t1  # state 更新
        t += 1

        # save progress every 10000 iterations
        if t % 10000 == 0:
            saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step = t)

        # print info
        state = ""
        if t <= OBSERVE:
            state = "observe"
        elif t > OBSERVE and t <= OBSERVE + EXPLORE:
            state = "explore"
        else:
            state = "train"

        print("TIMESTEP", t, "/ STATE", state, \
            "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \
            "/ Q_MAX %e" % np.max(readout_t))
        # write info to files
        '''
        if t % 10000 <= 100:
            a_file.write(",".join([str(x) for x in readout_t]) + '\n')
            h_file.write(",".join([str(x) for x in h_fc1.eval(feed_dict={s:[s_t]})[0]]) + '\n')
            cv2.imwrite("logs_tetris/frame" + str(t) + ".png", x_t1)
        '''

def playGame():
    sess = tf.InteractiveSession()
    s, readout, h_fc1 = createNetwork()
    trainNetwork(s, readout, h_fc1, sess)

def main():
    playGame()

if __name__ == "__main__":
    main()
 类似资料: