Deep Q-Learning深度增强学习(代码篇)

宗鸿博
2023-12-01

搭建DQN

初始化

#动作数量
self.n_actions 
#状态数量
self.n_features
#learning_rate学习速率
self.lr
#Q-learning中reward衰减因子
self.gamma
#e-greedy的选择概率最大值
self.epsilon_max 
#更新Q现实网络参数的步骤数
self.replace_target_iter
#存储记忆的数量
self.memory_size
#每次从记忆库中取的样本数量
self.batch_size = batch_size
self.epsilon_increment = e_greedy_increment
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
#学习的步骤
self.learn_step_counter
#记忆库,此刻的n_feature + 下一步的n_feature + reward + action
self.memory = np.zeros((self.memory_size, n_features * 2 + 2))

#利用Q目标的参数替换Q估计中的参数
t_params = tf.get_collection('target_net_params')
e_params = tf.get_collection('eval_net_params')
#生成了一个tensorflow操作列表[tf.assign(t1,e1), tf.assign(t2,e2), tf.assign(t3,e3)]
self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]

构建神经网络

构造Q估计神经网络

def _build_net(self):
    #输入
    self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')
    #Q现实输入
    self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target')

    with tf.variable_scope('eval_net'):
        #collection
        c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES] 
        #神经元数量
        n_l1 =  10
        #权值
        w_initializer = tf.random_normal_initializer(0., 0.3)
        #偏置
        b_initializer = tf.constant_initializer(0.1)

        #第一层神经元        
        with tf.variable_scope('l1'):
            w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
            b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
            l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)
        #第二层神经元
        with tf.variable_scope('l2'):
            w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
            b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
            self.q_eval = tf.matmul(l1, w2) + b2

        #基于Q估计与Q现实,构造loss-function
        with tf.variable_scope('loss'):
            self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))

        #训练
        with tf.variable_scope('train'):
            self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)

构造Q现实神经网络(该段代码紧接着上段,属于_build_net()函数)

    #输入
    self.s_sub = tf.placeholder(tf.float32, [None, self.n_features], name='s_sub')    
    with tf.variable_scope('target_net'):
        #collection
        c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

        #第一层神经元
        with tf.variable_scope('l1'):
            w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
            b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
            l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

        #第二层神经元
        with tf.variable_scope('l2'):
            w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
            b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
            self.q_next = tf.matmul(l1, w2) + b2

存储状态信息

    def store_transition(self, s, a, r, s_):
        if not hasattr(self, 'memory_counter'):
            self.memory_counter = 0

        #状态信息list ==> [x, y]
        #[action, reward]动作与奖励信息合并为list
        #下一步状态信息 ==> [x_next, y_next]
        transition = np.hstack((s, [a, r], s_))
        #hstack的结果为 ==> [x, y, a, r, x_next, y_next]

        #每过memory_size,替换存储值
        index = self.memory_counter % self.memory_size

        #memory为二维列表,transition为一行向量,插入index行中
        self.memory[index, :] = transition
        self.memory_counter += 1

选择动作action

    def choose_action(self, observation):
        # 将observation的list[x, y]转为行向量[[x, y]]
        observation = observation[np.newaxis, :]

        if np.random.uniform() < self.epsilon:
            # 得到每个action的q的估计值
            actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
            # 选择q值最大的action
            action = np.argmax(actions_value)
        else:
            action = np.random.randint(0, self.n_actions)
        return action

增强学习过程

    def learn(self):
        #更换参数
        if self.learn_step_counter % self.replace_target_iter == 0:
            self.sess.run(self.replace_target_op)

        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)

        #从memory中抽取一个记忆值,一个行向量
        #[x, y, a, r, x_next, y_next]
        batch_memory = self.memory[sample_index, :]

        q_next, q_eval = self.sess.run(
         [self.q_next, self.q_eval],
            feed_dict={
             self.s_: batch_memory[:, -self.n_features:],  # fixed params
             self.s: batch_memory[:, :self.n_features],  # newest params
            })

        q_target = q_eval.copy()

        batch_index = np.arange(self.batch_size, dtype=np.int32)
        eval_act_index = batch_memory[:, self.n_features].astype(int)
        reward = batch_memory[:, self.n_features + 1]

        q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)

        #训练网络
        _, self.cost = self.sess.run([self._train_op, self.loss],
                                     feed_dict={self.s: batch_memory[:, :self.n_features],
                                                self.q_target: q_target})
        self.cost_his.append(self.cost)

        # increasing epsilon
        self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
        self.learn_step_counter += 1

举例说明上述过程

数据结构

  • action=3
  • n_feature=2
  • batch_size=2

q-eval结构

action_0action_1action_2
121
232

行:每一个样本
列:每一个action对应的Q值

q-next,q-target与q-eval结构相同

batch-index样本索引

一维list ==> [0, 1] #长度:bactch_size

eval_act_index每个样本对应的action的值,也就是每个样本列的索引

一维list ==> [1, 0]

reward每个样本对应的reward的值

一维list ==> [1, 2]


过程

  1. 将q-eval的值赋给q-target
  2. 利用Q-learning算法,计算每一个样本的对应action的q值
    • 样本0,采取了action=0,真实的q值为-1
    • 样本1,采取了action=2,真实的q值为-2
  3. 更新q-target中的值
action_0action_1action_2
-121
23-2

4. 利用更新后的q-target与q-eval之间的差值进行训练


仿真过程

def run_maze():
    # 游戏的每一个回合需要的步数
    step = 0
    # 游戏的回合
    for episode in range(300):
        # 初始化观察值
        observation = env.reset()

        while True:
            # 开始环境仿真
            env.render()

            # 选择动作
            action = RL.choose_action(observation)

            # 加入动作后,环境进行仿真
            # 获取了执行action后,下一步的观测值observation
            # 获取了奖励reward
            # 游戏是否结束标志done
            observation_, reward, done = env.step(action)

            # 存储样本
            RL.store_transition(observation, action, reward, observation_)

            if (step > 200) and (step % 5 == 0):
            # 随机抽取样本,网络进行学习
                RL.learn()

            # 交换观测值
            observation = observation_

            # 判断游戏是否结束
            if done:
                break

            step += 1
 类似资料: