ml-agents在导出的unity环境部署训练模型onnx

魏硕
2023-12-01
import onnx
import mlagents
import onnxruntime as ort
import numpy as np
import time
import torch
import math
import gym
import tensorflow as tf
from gym_unity.envs import UnityToGymWrapper
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
#from mlagents_envs.base_env import DecisionSteps
from mlagents_envs.base_env import (
    BehaviorSpec,
    ActionSpec,
    DecisionSteps,
    TerminalSteps,
    BehaviorMapping,
    ActionTuple,
)
#from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes
#import sys
#sys.path.append('D:/ml-agents-release_17/ml-agents/mlagents/trainers/tests/')
#from dummy_config import create_observation_specs_with_shapes
# 加载模型
model = onnx.load(r'C:\Users\Win10\Desktop\Pyramids.onnx')
# 检查模型格式是否完整及正确
onnx.checker.check_model(model)
# 获取输出层,包含层名称、维度信息
output = model.graph.output

input = model.graph.input



# load the model with ONNX Runtime and look at its input and output.
onnx_session = ort.InferenceSession(r'C:\Users\Win10\Desktop\Pyramids.onnx')
print("input name='{}' and shape={}".format(
    onnx_session.get_inputs()[0].name, onnx_session.get_inputs()[0].shape))
print("input name1='{}' and shape={}".format(
    onnx_session.get_inputs()[1].name, onnx_session.get_inputs()[1].shape))
print("output name='{}' and shape={}".format(
    onnx_session.get_outputs()[0].name, onnx_session.get_outputs()[0].shape))

input_name0 = onnx_session.get_inputs()[0].name
input_name1 = onnx_session.get_inputs()[1].name

output_name0 = onnx_session.get_outputs()[0].name
output_name1 = onnx_session.get_outputs()[1].name
# This is a non-blocking call that only loads the environment.
unity_env = UnityEnvironment(file_name=r"D:\ml-agents-release\bin\UnityEnvironment")
#unity_env = UnityEnvironment(file_name=r"D:\ml-agents-release_17\bin\UnityEnvironment", seed=1, side_channels=[])

engine_config_channel=EngineConfigurationChannel()
#engine_config_channel.set_configuration_parameters(time_scale=0.1)

unity_env.reset()

#agents=unity_env.get_behavior_names()
#group_name = agents[0]
# group_spec = unity_env.get_behavior_spec(group_name)
#step_result=unity_env.get_steps(group_name)

behavior_names = unity_env.behavior_specs.keys()
#behavior_name = unity_env.behavior_specs.keys()
print("behavior_name:{}".format(behavior_names))

behavior_names = unity_env.behavior_specs
print("behavior_name:{}".format(behavior_names))
for behavior_name in behavior_names:
    print(behavior_name)
decision_steps, terminal_steps = unity_env.get_steps(behavior_name)
print("decision_steps {} " .format(decision_steps))
print(decision_steps[0].obs)
print(decision_steps[0].obs[1])
print(behavior_name)
# for agent_id_terminated in terminal_steps:
#     print("Agent " + behavior_name + " has terminated, resetting environment.")
#     # This is probably not the desired behaviour, as the other agents are still active.
#     unity_env.reset()
# actions=[]
# for agent_id_terminated in terminal_steps:
#     actions.append(np.random.uniform(-1, 1, 2))


#
# if len(actions) > 0:
#     unity_env.set_actions(behavior_name, np.array(actions))
# try:
#     unity_env.step()
# except:
#     print("Something happend when taking a step in the environment.")
#     print("The communicatior has probably terminated, stopping simulation early.")
#     #break
# unity_env.close()
def to_numpy(tensor):
    return tensor.detach().cpu().numpy().astype(np.float32) if tensor.requires_grad else tensor.cpu().numpy().astype(np.float32)
episode_count=100
reward=0

for i in range(episode_count):
    # Start interacting with the environment.
    unity_env.reset()
    while True:
        unity_env.step()
        decision_steps, terminal_steps = unity_env.get_steps(behavior_name)
        if len(terminal_steps)!=0:
            #the agent is done
            game_over=True
        #获取环境信息,作为onnx的输入
        a=list(np.array(decision_steps[0].obs[0]))#激光雷达数据
        b=list(np.array(decision_steps[0].obs[1]))#终点在小车坐标系下的坐标
        agentID=decision_steps[0].agent_id
        observation=np.asarray(np.array([a+b]),dtype=np.float32)
        actionMask=np.asarray(decision_steps[0].action_mask, dtype=np.float32)
        actionMask = np.asarray(np.array([[1,1]]), dtype=np.float32)
        # 运行onnx模型,模型有两个输入,
        result = onnx_session.run([], {input_name0: observation, input_name1: actionMask})
        if result[4][0][0]>result[4][0][1]:
            discr=0
        else:
            discr=1
        discr=[[discr]]
        action_tuple=ActionTuple(np.asarray(result[2],dtype=np.float32),np.asarray(discr,dtype=np.int32))
        #if(result[3][0][0]):

        # conti_action = np.asarray(discr, dtype=np.int32)
        # disc_action=np.asarray(result[2],dtype=np.float32)
        # action_tuple=ActionTuple()
        #
        # action_tuple.add_continuous(conti_action)
        # action_tuple.add_discrete(disc_action)
        #unity_env.set_actions(behavior_name, action_tuple)
        unity_env.set_action_for_agent(behavior_name,agentID,action_tuple)
        episode_done=(terminal_steps.group_reward.shape[0]>0)
        print(episode_done)
        if episode_done:
            break
unity_env.close()
 类似资料: