本文章已经基于读者掌握了DQN,DoubleDQN, Dueling DQN的基础之上所做的代码,
DQN 入门链接
莫凡 DQN
知乎白话文DQN
Double DQN
莫凡 DoubleQN
知乎白话文DoubleDQN
Dueling DQN
莫凡 Dueling DQN
知乎 Dueling DQN
HighwayENV 链接
说明 莫凡中的代码所有的targe_q_value 的计算方式都是按照
所有action的输出Q值来计算loss, 是多维多列矩阵。
所有官方代码都是 基于确定的action 所在的列来计算targe_q_value, 最后计算loss,是多维单列矩阵,具体2.2已经声明
主要流程
普通DQN:
eavl_net = (举例子:input_dim256256*out_dim)
target_net = Same(eavl_net)
DuelingDQN:
将Q值 输出拆分为 value + advantage 的和
advantage_net = (举例子:input_dim*256*256*out_dim)
value_net = (举例子:input_dim*256*256*1)
eavl_net = value_net + ( advantage_net - mean(advantage_net) )
target_net = Same(eavl_net)
准备环境
s: 当前环境输入
a: 当前使用的动作
r: 当前的回报
d: 动作是否done(成功或着失败)
s_: 下一刻环境
current_q_value = self.eval_net(s).gather(1,a)
gather(1,a) 选择所在列的数据
# Compute the next Q-values using the target network
next_q_values = self.target_net(b_s_)
# double DQN 主要解决 Q值过高问题 Q_next = target_net(s_, argmax(eval_net(s_)))
# 普通 DQN Q_next = target_net(s_).max()
if self.double_q:
#启用double dqn
#根据环境s_ 计算 eval_net Q值
next_eval_values = self.eval_net(b_s_)
#选择Q值最大的 action 的值
actions = next_eval_values.argmax(dim=1).reshape(-1, 1)
#根据这个action 输入---> target 得出 当前 target_value
next_q_values = next_q_values.gather(1,actions)
else:
# 普通DQN 直接选择 next_q_values 中的最大值即可
next_q_values, _ = next_q_values.max(dim=1)
next_q_values = next_q_values.reshape(-1, 1)
target_q_values = r + (1 - d) * self.gamma * next_q_values
目前官方都用smoothL1loss /huber loss 来确定的action计算 loss
https://zhuanlan.zhihu.com/p/83131026
loss = F.smooth_l1_loss(current_q_values, target_q_values)
self.optimizer.zero_grad() #梯度清0
loss.backward() #梯度反向传播
nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm) #梯度截断
self.optimizer.step()
DQN 代码
import numpy as np
import os
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import torch as th
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
# stable baseline3 中的构建多层全连接神经网络
def create_mlp_net(
input_dim: int, #输入向量的维度
output_dim: int, #输出结果的维度
net_arch: List[int],
activation_fn: Type[nn.Module] = nn.ReLU,
squash_output: bool = False):
modules = [nn.Linear(input_dim, net_arch[0]), activation_fn()]
for idx in range(len(net_arch) - 1):
modules.append(nn.Linear(net_arch[idx], net_arch[idx + 1]))
modules.append(activation_fn())
if output_dim > 0:
last_layer_dim = net_arch[-1] if len(net_arch) > 0 else input_dim
modules.append(nn.Linear(last_layer_dim, output_dim))
return nn.Sequential(*modules)
class DuelingNet(nn.Module):
def __init__(
self,
input_dim: int,
output_dim: int, #输出结果的维度
net_arch: List[int],
activation_fn: Type[nn.Module] = nn.ReLU,
):
super(DuelingNet, self).__init__()
self.value_net = create_mlp_net(input_dim,1,net_arch,activation_fn)
self.advantage_net = create_mlp_net(input_dim,output_dim,net_arch,activation_fn)
def forward(self,x):
value_out = self.value_net(x)
advantage_out = self.advantage_net(x)
average_advantage = advantage_out - th.mean(advantage_out)
q_value = value_out + average_advantage
return q_value
class DQN:
def __init__(
self,
env,
learning_rate=0.005,
reward_decay=0.9,
e_greedy=0.9,
e_greedy_increment=None,
target_update_interval=200,
memory_size=3000,
batch_size=32,
output_graph=False,
DOUBLE_DQN=False, # 优化DQN Q值过高
DUELING_DQN=False, # 使用value + advantage 来获取Q值
):
self.env = env
self.n_actions = env.action_space.n
self.n_features = env.observation_space.shape[0] * env.observation_space.shape[1]
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy
self.replace_target_iter = target_update_interval
self.memory_size = memory_size
self.batch_size = batch_size
self.double_q = DOUBLE_DQN
self.dueling_q = DUELING_DQN
self.epsilon_increment = e_greedy_increment
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
self.max_grad_norm = 10
self.learn_step_counter = 0
#开辟 memory_size 个 [s,a,r,s_] 大小的空间
self.memory = np.zeros((self.memory_size, self.n_features*2 + 3 ),dtype=np.float32) #
self.memory_counter = 0
#build layer
if self.dueling_q:
self.eval_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
self.target_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
else:
self.eval_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
self.target_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
print("model------->")
print(self.eval_net)
#------- Define the optimizer------#
self.optimizer = th.optim.Adam(self.eval_net.parameters(), learning_rate)
# ------Define the loss function-----#
self.loss_func = nn.SmoothL1Loss
self.loss = 0.0
def store_transition(self, s, a, r, s_, done):
transition = np.hstack((s.flatten(), [a, r, done], s_.flatten()))
index = self.memory_counter % self.memory_size
self.memory[index, :] = transition
self.memory_counter += 1
def choose_rlnet_action(self, observation):
s = th.unsqueeze(th.FloatTensor(observation.flatten()), 0) #增加一个维度
q_values = self.eval_net(s)
action = q_values.argmax(dim=1).reshape(-1)
return action.item()
def choose_action(self, observation, determinstic = False):
if determinstic:
return self.choose_rlnet_action(observation)
else:
if np.random.uniform() > self.epsilon: # choosing action
return np.random.randint(0, self.n_actions)
else:
return self.choose_rlnet_action(observation)
def save_model(self):
if os.path.exists('torch_dqn_highway_model.pkl'):
os.system("rm -rf torch_dqn_highway_model.pkl")
th.save(self.eval_net,'torch_dqn_highway_model.pkl')
print(" model saved !!")
def load_model(self):
print('load model')
self.eval_net = th.load('torch_dqn_highway_model.pkl')
def train_sample(self,sample_index):
#参数硬更新
if self.learn_step_counter % self.replace_target_iter == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
print('eval_net ---> targe_net: target_net_params_updated!')
batch_memory = self.memory[sample_index, :]
# 保持所有的输入具有相同的 batch_size 的维度
b_s = Variable(th.FloatTensor(batch_memory[:, :self.n_features]))
b_a = Variable(th.LongTensor(batch_memory[:, self.n_features].astype(int).reshape(-1,1)))
b_r = Variable(th.FloatTensor(batch_memory[:, self.n_features + 1].reshape(-1,1)))
b_d = Variable(th.FloatTensor(batch_memory[:, self.n_features + 2]).reshape(-1,1))
b_s_ = Variable(th.FloatTensor(batch_memory[:, -self.n_features:]))
# stablebaseline3 DQN执行代码
with th.no_grad():
# Compute the next Q-values using the target network
next_q_values = self.target_net(b_s_)
if self.double_q:
#启用double dqn
#根据环境s_ 计算 eval_net Q值
next_eval_values = self.eval_net(b_s_)
#选择Q值最大的 action 的值
actions = next_eval_values.argmax(dim=1).reshape(-1, 1)
#根据这个action 输入---> target 得出 当前 target_value
next_q_values = next_q_values.gather(1,actions)
else:
# Follow greedy policy: use the one with the highest value
next_q_values, _ = next_q_values.max(dim=1)
next_q_values = next_q_values.reshape(-1, 1)
# print(next_q_values)
# 1-step TD target
target_q_values = b_r + (1 - b_d) * self.gamma * next_q_values
current_q_values = self.eval_net(b_s).gather(1,b_a)
# Compute Huber loss (less sensitive to outliers) when delta =1 : huber loss = smooth loss
loss = F.smooth_l1_loss(current_q_values, target_q_values)
self.loss = loss.item()
self.optimizer.zero_grad() # reset the gradient to zero
loss.backward()
nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm)
self.optimizer.step()
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1
def train(self):
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
self.train_sample(sample_index)
def learn(self,learn_start, total_timesteps):
eposide_count = 0
try:
while True:
s = self.env.reset()
print("\nnew eposide------>")
while True:
a = self.choose_action(s)
s_,r,done,info = self.env.step(a)
self.store_transition(s,a,r,s_,done)
if self.memory_counter > learn_start:
self.train()
s = s_
if done or self.memory_counter > total_timesteps:
break
eposide_count +=1
if self.memory_counter > learn_start:
print("eposides_count :", eposide_count)
print("time_steps :", self.memory_counter)
print("epsilon :",self.epsilon)
print("loss :",self.loss)
print("learning progress:",float(self.memory_counter) / total_timesteps)
if self.memory_counter > total_timesteps:
print("learning stop !!")
break
except KeyboardInterrupt:
print("KeyboardInterrupt,learning stop")
self.save_model()
def test(self):
global stop_flag
self.load_model()
s = self.env.reset()
stop_flag = False
try:
while True:
a = self.choose_action(s,determinstic = True)
print("action type:",self.env.action_type.actions[a])
s_,r,done,info = self.env.step(a)
s = s_
self.env.render()
if done:
s = self.env.reset()
except KeyboardInterrupt:
print("KeyboardInterrupt, stop")
highway 主程序
#!/usr/bin/python3
import sys
DUELING_DQN = False
DOUBLE_DQN = False
if len(sys.argv) > 1:
if (sys.argv[1] == '-h' or sys.argv[1] == '--help'):
print(
'''
-dueling enable dueling dqn
-double enable double dqn
-all enable double and dueling dqn
'''
)
exit(0)
elif sys.argv[1] == '-dueling':
print('enable DUELING_DQN')
DUELING_DQN = True
elif sys.argv[1] == '-double':
print('enable DOUBLE_DQN')
DOUBLE_DQN = True
elif sys.argv[1] == '-all':
print('enable DOUBLE_DQN DUELING_DQN')
DUELING_DQN = True
DOUBLE_DQN = True
else:
print('use default dqn model')
import gym
import highway_env
from dqn import DQN
import time
env = gym.make("highway-v0")
config ={
'action': {'type': 'DiscreteMetaAction',},
'observatoin': {'vehicles_count': 20,},
'manual_control': False,
'simulation_frequency': 15,
'policy_frequency': 5,
'duration': 30000, #多少步以后认为本循环结束
}
env.config.update(config)
env.reset()
rl_model = DQN( env,
memory_size=15000,
batch_size=32,
e_greedy_increment=0.0001,
e_greedy = 0.8,
learning_rate= 5e-4,
reward_decay=0.8,
target_update_interval=50,
DOUBLE_DQN = DOUBLE_DQN,
DUELING_DQN = DUELING_DQN,
)
t1 = time.time()
# use crtl-c to stop
rl_model.learn(learn_start = 200, total_timesteps = 2e4)
print('Training time: ', time.time() - t1)
# use crtl-c to stop
rl_model.test()
结果
大概训练2万次以上有基本的避让动作,几十万次可以基本无碰撞避障.