#动作数量
self.n_actions
#状态数量
self.n_features
#learning_rate学习速率
self.lr
#Q-learning中reward衰减因子
self.gamma
#e-greedy的选择概率最大值
self.epsilon_max
#更新Q现实网络参数的步骤数
self.replace_target_iter
#存储记忆的数量
self.memory_size
#每次从记忆库中取的样本数量
self.batch_size = batch_size
self.epsilon_increment = e_greedy_increment
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
#学习的步骤
self.learn_step_counter
#记忆库,此刻的n_feature + 下一步的n_feature + reward + action
self.memory = np.zeros((self.memory_size, n_features * 2 + 2))
#利用Q目标的参数替换Q估计中的参数
t_params = tf.get_collection('target_net_params')
e_params = tf.get_collection('eval_net_params')
#生成了一个tensorflow操作列表[tf.assign(t1,e1), tf.assign(t2,e2), tf.assign(t3,e3)]
self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
def _build_net(self):
#输入
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')
#Q现实输入
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target')
with tf.variable_scope('eval_net'):
#collection
c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
#神经元数量
n_l1 = 10
#权值
w_initializer = tf.random_normal_initializer(0., 0.3)
#偏置
b_initializer = tf.constant_initializer(0.1)
#第一层神经元
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)
#第二层神经元
with tf.variable_scope('l2'):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
self.q_eval = tf.matmul(l1, w2) + b2
#基于Q估计与Q现实,构造loss-function
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
#训练
with tf.variable_scope('train'):
self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
#输入
self.s_sub = tf.placeholder(tf.float32, [None, self.n_features], name='s_sub')
with tf.variable_scope('target_net'):
#collection
c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
#第一层神经元
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)
#第二层神经元
with tf.variable_scope('l2'):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
self.q_next = tf.matmul(l1, w2) + b2
def store_transition(self, s, a, r, s_):
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
#状态信息list ==> [x, y]
#[action, reward]动作与奖励信息合并为list
#下一步状态信息 ==> [x_next, y_next]
transition = np.hstack((s, [a, r], s_))
#hstack的结果为 ==> [x, y, a, r, x_next, y_next]
#每过memory_size,替换存储值
index = self.memory_counter % self.memory_size
#memory为二维列表,transition为一行向量,插入index行中
self.memory[index, :] = transition
self.memory_counter += 1
def choose_action(self, observation):
# 将observation的list[x, y]转为行向量[[x, y]]
observation = observation[np.newaxis, :]
if np.random.uniform() < self.epsilon:
# 得到每个action的q的估计值
actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
# 选择q值最大的action
action = np.argmax(actions_value)
else:
action = np.random.randint(0, self.n_actions)
return action
def learn(self):
#更换参数
if self.learn_step_counter % self.replace_target_iter == 0:
self.sess.run(self.replace_target_op)
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
#从memory中抽取一个记忆值,一个行向量
#[x, y, a, r, x_next, y_next]
batch_memory = self.memory[sample_index, :]
q_next, q_eval = self.sess.run(
[self.q_next, self.q_eval],
feed_dict={
self.s_: batch_memory[:, -self.n_features:], # fixed params
self.s: batch_memory[:, :self.n_features], # newest params
})
q_target = q_eval.copy()
batch_index = np.arange(self.batch_size, dtype=np.int32)
eval_act_index = batch_memory[:, self.n_features].astype(int)
reward = batch_memory[:, self.n_features + 1]
q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)
#训练网络
_, self.cost = self.sess.run([self._train_op, self.loss],
feed_dict={self.s: batch_memory[:, :self.n_features],
self.q_target: q_target})
self.cost_his.append(self.cost)
# increasing epsilon
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1
举例说明上述过程
action_0 | action_1 | action_2 |
---|---|---|
1 | 2 | 1 |
2 | 3 | 2 |
行:每一个样本
列:每一个action对应的Q值
一维list ==> [0, 1] #长度:bactch_size
一维list ==> [1, 0]
一维list ==> [1, 2]
action_0 | action_1 | action_2 |
---|---|---|
-1 | 2 | 1 |
2 | 3 | -2 |
4. 利用更新后的q-target与q-eval之间的差值进行训练
def run_maze():
# 游戏的每一个回合需要的步数
step = 0
# 游戏的回合
for episode in range(300):
# 初始化观察值
observation = env.reset()
while True:
# 开始环境仿真
env.render()
# 选择动作
action = RL.choose_action(observation)
# 加入动作后,环境进行仿真
# 获取了执行action后,下一步的观测值observation
# 获取了奖励reward
# 游戏是否结束标志done
observation_, reward, done = env.step(action)
# 存储样本
RL.store_transition(observation, action, reward, observation_)
if (step > 200) and (step % 5 == 0):
# 随机抽取样本,网络进行学习
RL.learn()
# 交换观测值
observation = observation_
# 判断游戏是否结束
if done:
break
step += 1