当前位置: 首页 > 工具软件 > DoubleH OS > 使用案例 >

Pytorch DQN Double DQN Dueling DQN 实现跑 Highway

张亦
2023-12-01

本文章已经基于读者掌握了DQN,DoubleDQN, Dueling DQN的基础之上所做的代码,
DQN 入门链接
莫凡 DQN
知乎白话文DQN
Double DQN
莫凡 DoubleQN
知乎白话文DoubleDQN
Dueling DQN
莫凡 Dueling DQN
知乎 Dueling DQN
HighwayENV 链接

说明 莫凡中的代码所有的targe_q_value 的计算方式都是按照
所有action的输出Q值来计算loss, 是多维多列矩阵。
所有官方代码都是 基于确定的action 所在的列来计算targe_q_value, 最后计算loss,是多维单列矩阵,具体2.2已经声明

主要流程

1 构建eval_net target_net

普通DQN:
eavl_net = (举例子:input_dim256256*out_dim)
target_net = Same(eavl_net)
DuelingDQN:
将Q值 输出拆分为 value + advantage 的和

    advantage_net =  (举例子:input_dim*256*256*out_dim)
    value_net =  (举例子:input_dim*256*256*1)
    eavl_net = value_net + ( advantage_net - mean(advantage_net) )
    target_net = Same(eavl_net)

准备环境
s: 当前环境输入
a: 当前使用的动作
r: 当前的回报
d: 动作是否done(成功或着失败)
s_: 下一刻环境

2 计算 current_q_value targe_q_value

2.1 根据当前的环境输入 s 和 action 获取 current_q_value

current_q_value = self.eval_net(s).gather(1,a)
gather(1,a) 选择所在列的数据

2.2 根据下一刻环境输入 s_ 从所有输出总选择最大的 Q_next

# Compute the next Q-values using the target network
next_q_values = self.target_net(b_s_)

# double DQN 主要解决 Q值过高问题 Q_next = target_net(s_, argmax(eval_net(s_)))
# 普通 DQN                      Q_next = target_net(s_).max()

if self.double_q:
    #启用double dqn
    #根据环境s_ 计算 eval_net Q值
    next_eval_values = self.eval_net(b_s_)
    #选择Q值最大的 action 的值
    actions = next_eval_values.argmax(dim=1).reshape(-1, 1)
    #根据这个action 输入---> target 得出 当前 target_value
    next_q_values = next_q_values.gather(1,actions)
else:
    # 普通DQN 直接选择 next_q_values 中的最大值即可
    next_q_values, _ = next_q_values.max(dim=1)
    next_q_values = next_q_values.reshape(-1, 1)

2.3 根据根据下一刻环境输入 r, done 标志 ,gamma 来计算 targe_q_value

target_q_values = r + (1 - d) * self.gamma * next_q_values

3 SmoothL1loss 计算Loss

目前官方都用smoothL1loss /huber loss 来确定的action计算 loss
https://zhuanlan.zhihu.com/p/83131026
loss = F.smooth_l1_loss(current_q_values, target_q_values)

计算梯度并反馈

self.optimizer.zero_grad() #梯度清0
loss.backward() #梯度反向传播
nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm) #梯度截断
self.optimizer.step()

DQN 代码

import numpy as np
import os
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import torch as th
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F

# stable baseline3 中的构建多层全连接神经网络
def create_mlp_net(
    input_dim: int,  #输入向量的维度
    output_dim: int, #输出结果的维度
    net_arch: List[int],
    activation_fn: Type[nn.Module] = nn.ReLU,
    squash_output: bool = False):

    modules = [nn.Linear(input_dim, net_arch[0]), activation_fn()]

    for idx in range(len(net_arch) - 1):
        modules.append(nn.Linear(net_arch[idx], net_arch[idx + 1]))
        modules.append(activation_fn())

    if output_dim > 0:
        last_layer_dim = net_arch[-1] if len(net_arch) > 0 else input_dim
        modules.append(nn.Linear(last_layer_dim, output_dim))
    return nn.Sequential(*modules)

class DuelingNet(nn.Module):
    def __init__(
        self,
        input_dim: int,
        output_dim: int, #输出结果的维度
        net_arch: List[int],
        activation_fn: Type[nn.Module] = nn.ReLU,   
        ):
        super(DuelingNet, self).__init__()
        self.value_net = create_mlp_net(input_dim,1,net_arch,activation_fn)
        self.advantage_net = create_mlp_net(input_dim,output_dim,net_arch,activation_fn)
    def forward(self,x):
        value_out = self.value_net(x)
        advantage_out = self.advantage_net(x)
        average_advantage = advantage_out - th.mean(advantage_out)
        q_value = value_out + average_advantage
        return q_value 

class DQN:
    def __init__(
            self,
            env,
            learning_rate=0.005,
            reward_decay=0.9,
            e_greedy=0.9,
            e_greedy_increment=None,
            target_update_interval=200,
            memory_size=3000,
            batch_size=32,
            output_graph=False,
            DOUBLE_DQN=False,   # 优化DQN Q值过高
            DUELING_DQN=False,  # 使用value + advantage 来获取Q值
    ):
        self.env = env
        self.n_actions = env.action_space.n
        self.n_features = env.observation_space.shape[0] * env.observation_space.shape[1]
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon_max = e_greedy
        self.replace_target_iter = target_update_interval
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.double_q = DOUBLE_DQN
        self.dueling_q = DUELING_DQN
        self.epsilon_increment = e_greedy_increment
        self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max

        self.max_grad_norm = 10
        self.learn_step_counter = 0

        #开辟 memory_size 个 [s,a,r,s_] 大小的空间
        self.memory = np.zeros((self.memory_size, self.n_features*2 + 3 ),dtype=np.float32) # 
        self.memory_counter = 0

        #build layer
        if self.dueling_q:
            self.eval_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
            self.target_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
        else:
            self.eval_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
            self.target_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)
        
        print("model------->")
        print(self.eval_net)

        #------- Define the optimizer------#
        self.optimizer = th.optim.Adam(self.eval_net.parameters(), learning_rate)
        # ------Define the loss function-----#
        self.loss_func = nn.SmoothL1Loss
        self.loss = 0.0


    def store_transition(self, s, a, r, s_, done):
        
        transition = np.hstack((s.flatten(), [a, r, done], s_.flatten()))
        index = self.memory_counter % self.memory_size
        self.memory[index, :] = transition
        self.memory_counter += 1

    def choose_rlnet_action(self, observation):

        s = th.unsqueeze(th.FloatTensor(observation.flatten()), 0) #增加一个维度
        q_values = self.eval_net(s)
        action = q_values.argmax(dim=1).reshape(-1)
        return action.item()

    def choose_action(self, observation, determinstic = False):
        if determinstic:
         return self.choose_rlnet_action(observation)
        else:
            if np.random.uniform() > self.epsilon:  # choosing action
                return np.random.randint(0, self.n_actions)
            else:
                return self.choose_rlnet_action(observation)

    def save_model(self):
        if os.path.exists('torch_dqn_highway_model.pkl'):
            os.system("rm -rf torch_dqn_highway_model.pkl")
        th.save(self.eval_net,'torch_dqn_highway_model.pkl')
        print(" model saved !!")
    def load_model(self):
        print('load model')
        self.eval_net = th.load('torch_dqn_highway_model.pkl')
    def train_sample(self,sample_index):
        #参数硬更新
        if self.learn_step_counter % self.replace_target_iter == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
            print('eval_net ---> targe_net: target_net_params_updated!')
        batch_memory = self.memory[sample_index, :]

        # 保持所有的输入具有相同的 batch_size 的维度
        b_s = Variable(th.FloatTensor(batch_memory[:, :self.n_features]))
        b_a = Variable(th.LongTensor(batch_memory[:, self.n_features].astype(int).reshape(-1,1)))        
        b_r = Variable(th.FloatTensor(batch_memory[:, self.n_features + 1].reshape(-1,1)))
        b_d = Variable(th.FloatTensor(batch_memory[:, self.n_features + 2]).reshape(-1,1))
        b_s_ = Variable(th.FloatTensor(batch_memory[:, -self.n_features:]))

        # stablebaseline3 DQN执行代码
        with th.no_grad():
            # Compute the next Q-values using the target network
            next_q_values = self.target_net(b_s_)

            if self.double_q:
                #启用double dqn
                #根据环境s_ 计算 eval_net Q值
                next_eval_values = self.eval_net(b_s_)
                #选择Q值最大的 action 的值
                actions = next_eval_values.argmax(dim=1).reshape(-1, 1)
                #根据这个action 输入---> target 得出 当前 target_value
                next_q_values = next_q_values.gather(1,actions)
            else:
                # Follow greedy policy: use the one with the highest value
                next_q_values, _ = next_q_values.max(dim=1)
                next_q_values = next_q_values.reshape(-1, 1)

            # print(next_q_values)
            # 1-step TD target
            target_q_values = b_r + (1 - b_d) * self.gamma * next_q_values

        current_q_values = self.eval_net(b_s).gather(1,b_a)
        # Compute Huber loss (less sensitive to outliers) when delta =1 : huber loss = smooth loss
        loss = F.smooth_l1_loss(current_q_values, target_q_values)
        self.loss = loss.item()
        self.optimizer.zero_grad() # reset the gradient to zero
        loss.backward()
        nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm)
        self.optimizer.step()
        
        self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
        self.learn_step_counter += 1

    def train(self):
        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
        
        self.train_sample(sample_index)
    def learn(self,learn_start, total_timesteps):
        eposide_count = 0
        try:
            while True:
                s = self.env.reset()
                print("\nnew eposide------>")
                while True:
                    a = self.choose_action(s)
                    s_,r,done,info = self.env.step(a)
                    self.store_transition(s,a,r,s_,done)
                    if self.memory_counter > learn_start:
                        self.train()
                    s = s_
                    if done or self.memory_counter > total_timesteps:
                        break
                eposide_count +=1

                if self.memory_counter > learn_start:
                    print("eposides_count   :", eposide_count)
                    print("time_steps       :", self.memory_counter)
                    print("epsilon          :",self.epsilon)
                    print("loss             :",self.loss)
                    print("learning progress:",float(self.memory_counter) / total_timesteps)
                if self.memory_counter > total_timesteps:
                    print("learning stop !!")
                    break
        except KeyboardInterrupt:
            print("KeyboardInterrupt,learning stop")
        self.save_model()
    def test(self):
        global stop_flag
        self.load_model()
        s = self.env.reset()
        stop_flag = False
        try:
            while True:
                    a = self.choose_action(s,determinstic = True)
                    print("action type:",self.env.action_type.actions[a])
                    s_,r,done,info = self.env.step(a)
                    s = s_
                    self.env.render()
                    if done:
                        s = self.env.reset()
        except KeyboardInterrupt:
            print("KeyboardInterrupt, stop")

highway 主程序

#!/usr/bin/python3

import sys

DUELING_DQN = False
DOUBLE_DQN = False

if len(sys.argv) > 1:
    if (sys.argv[1] == '-h' or sys.argv[1] == '--help'):
        print(
'''
-dueling enable dueling dqn
-double  enable double dqn
-all     enable double and dueling dqn
'''
)
        exit(0)
    elif sys.argv[1] == '-dueling':
        print('enable DUELING_DQN')
        DUELING_DQN = True
    elif sys.argv[1] == '-double':
        print('enable DOUBLE_DQN')        
        DOUBLE_DQN = True
    elif sys.argv[1] == '-all':
        print('enable DOUBLE_DQN DUELING_DQN')
        DUELING_DQN = True
        DOUBLE_DQN = True
    else:
        print('use default dqn model')

import gym
import highway_env
from dqn import DQN
import time

env = gym.make("highway-v0")
config ={
        'action': {'type': 'DiscreteMetaAction',},
        'observatoin': {'vehicles_count': 20,},
        'manual_control': False,
        'simulation_frequency': 15,
        'policy_frequency': 5,
        'duration': 30000, #多少步以后认为本循环结束
}
env.config.update(config)
env.reset()

rl_model = DQN( env,
                memory_size=15000,
                batch_size=32,
                e_greedy_increment=0.0001,
                e_greedy = 0.8, 
                learning_rate= 5e-4,
                reward_decay=0.8,
                target_update_interval=50,
                DOUBLE_DQN = DOUBLE_DQN,
                DUELING_DQN = DUELING_DQN,
                )
t1 = time.time()
# use crtl-c to stop
rl_model.learn(learn_start = 200, total_timesteps = 2e4)
print('Training time: ', time.time() - t1)       
# use crtl-c to stop
rl_model.test()

结果
大概训练2万次以上有基本的避让动作,几十万次可以基本无碰撞避障.

 类似资料: