当前位置: 首页 > 工具软件 > Frozen > 使用案例 >

强化学习:策略迭代求解FrozenLake-v1滑冰问题代码

郝原
2023-12-01

code

import numpy as np
np.random.seed(0)
import gym
def play_policy(env, policy, render=False):
    total_reward = 0.
    observation = env.reset()
    while True:
        if render:
            env.render() # 此行可显示
        action = np.random.choice(env.action_space.n,
                p=policy[observation])
        observation, reward, done, _ = env.step(action)
        total_reward += reward  # 统计回合奖励
        if done: # 游戏结束
            break
    return total_reward

def policy_v2q(env,v,s=None,gamma=1.0):
    if s is not None: # 计算第v[s]
        q = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            for p,next_state,reward,done in env.unwrapped.P[s][action]:
                q[action] += p*(reward + gamma*v[next_state]*(1-done))
    else:
        q = np.zeros((env.observation_space.n,env.action_space.n))
        for state in range(env.observation_space.n):
            q[state] = policy_v2q(env,v,state,gamma)
    return q

# 下面是策略评估
def policy_evluate(env,policy,gamma=1.0,tolerant=1e-6):
    v = np.zeros(env.observation_space.n)
    vs = np.zeros(env.observation_space.n)
    q = np.zeros(env.action_space.n)
    while True:
        delta = 0
        for state in range(env.observation_space.n):
            vs[state] = sum(policy_v2q(env,v,state,gamma)*policy[state])
            delta = max(delta,vs[state]-v[state])
            v[state] = vs[state]
        if delta<tolerant :
            break
    return v

# 下面是策略改进
def policy_improve(env,v,policy,gamma=1.0):
    optimal = True
    for state in range(env.observation_space.n):
        q = policy_v2q(env,v,state,gamma)
        a_new = np.argmax(q)  # 该步执行贪心策略
        if policy[state][a_new] != 1:
            policy[state] = 0
            policy[state][a_new] = 1
            optimal = False
    return optimal,policy

# 下面是进行策略迭代
def policy_iterative(env,gamma=1.0,tolerant=1e-6):
    # 下面先进行随机策略初始化
    policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
    # 不断进行策略评估与策略改进
    while True:
        v = policy_evluate(env,policy,gamma,tolerant)
        optimal, policy = policy_improve(env,v,policy,gamma)
        if optimal == True:
            break
    return policy,v

if __name__=="__main__":
    # 下面尝试用随机策略玩
    env = gym.make('FrozenLake-v1')
    random_policy = \
        np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
    episode_rewards = [play_policy(env, random_policy) for _ in range(100)]
    print("随机策略 平均奖励:{:.2f}".format(np.mean(episode_rewards)))
    # 下面尝试自己编写的策略评估函数
    print('状态价值函数:')
    v_random = policy_evluate(env,random_policy)
    print(v_random.reshape(4,4))
    print('动作函数:')
    q_random = policy_v2q(env,v_random,s=None)
    print(q_random)
    print('更新前策略是:')
    print(random_policy)

    # 下面开始进行策略改进
    optimal,policy = policy_improve(env, v_random, random_policy, gamma=1.0)
    # 更新前策略是:
    print('在一次策略改进之后:')
    if optimal == True:
        print('找到最优解,策略是:')
        print(policy)
    else:
        print('未找到最优解,策略是:')
        print(policy)

    # 下面是完整代码
    print('*****************下面是完整策略迭代结果********************')
    policy,v = policy_iterative(env, gamma=1.0, tolerant=1e-6)
    print('找到的最优策略是:{}'.format(policy))
    print('找到最优策略是')
    print(np.argmax(policy,axis=1).reshape(4,4))
  
 类似资料: