import onnx
import mlagents
import onnxruntime as ort
import numpy as np
import time
import torch
import math
import gym
import tensorflow as tf
from gym_unity.envs import UnityToGymWrapper
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
#from mlagents_envs.base_env import DecisionSteps
from mlagents_envs.base_env import (
BehaviorSpec,
ActionSpec,
DecisionSteps,
TerminalSteps,
BehaviorMapping,
ActionTuple,
)
#from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes
#import sys
#sys.path.append('D:/ml-agents-release_17/ml-agents/mlagents/trainers/tests/')
#from dummy_config import create_observation_specs_with_shapes
# 加载模型
model = onnx.load(r'C:\Users\Win10\Desktop\Pyramids.onnx')
# 检查模型格式是否完整及正确
onnx.checker.check_model(model)
# 获取输出层,包含层名称、维度信息
output = model.graph.output
input = model.graph.input
# load the model with ONNX Runtime and look at its input and output.
onnx_session = ort.InferenceSession(r'C:\Users\Win10\Desktop\Pyramids.onnx')
print("input name='{}' and shape={}".format(
onnx_session.get_inputs()[0].name, onnx_session.get_inputs()[0].shape))
print("input name1='{}' and shape={}".format(
onnx_session.get_inputs()[1].name, onnx_session.get_inputs()[1].shape))
print("output name='{}' and shape={}".format(
onnx_session.get_outputs()[0].name, onnx_session.get_outputs()[0].shape))
input_name0 = onnx_session.get_inputs()[0].name
input_name1 = onnx_session.get_inputs()[1].name
output_name0 = onnx_session.get_outputs()[0].name
output_name1 = onnx_session.get_outputs()[1].name
# This is a non-blocking call that only loads the environment.
unity_env = UnityEnvironment(file_name=r"D:\ml-agents-release\bin\UnityEnvironment")
#unity_env = UnityEnvironment(file_name=r"D:\ml-agents-release_17\bin\UnityEnvironment", seed=1, side_channels=[])
engine_config_channel=EngineConfigurationChannel()
#engine_config_channel.set_configuration_parameters(time_scale=0.1)
unity_env.reset()
#agents=unity_env.get_behavior_names()
#group_name = agents[0]
# group_spec = unity_env.get_behavior_spec(group_name)
#step_result=unity_env.get_steps(group_name)
behavior_names = unity_env.behavior_specs.keys()
#behavior_name = unity_env.behavior_specs.keys()
print("behavior_name:{}".format(behavior_names))
behavior_names = unity_env.behavior_specs
print("behavior_name:{}".format(behavior_names))
for behavior_name in behavior_names:
print(behavior_name)
decision_steps, terminal_steps = unity_env.get_steps(behavior_name)
print("decision_steps {} " .format(decision_steps))
print(decision_steps[0].obs)
print(decision_steps[0].obs[1])
print(behavior_name)
# for agent_id_terminated in terminal_steps:
# print("Agent " + behavior_name + " has terminated, resetting environment.")
# # This is probably not the desired behaviour, as the other agents are still active.
# unity_env.reset()
# actions=[]
# for agent_id_terminated in terminal_steps:
# actions.append(np.random.uniform(-1, 1, 2))
#
# if len(actions) > 0:
# unity_env.set_actions(behavior_name, np.array(actions))
# try:
# unity_env.step()
# except:
# print("Something happend when taking a step in the environment.")
# print("The communicatior has probably terminated, stopping simulation early.")
# #break
# unity_env.close()
def to_numpy(tensor):
return tensor.detach().cpu().numpy().astype(np.float32) if tensor.requires_grad else tensor.cpu().numpy().astype(np.float32)
episode_count=100
reward=0
for i in range(episode_count):
# Start interacting with the environment.
unity_env.reset()
while True:
unity_env.step()
decision_steps, terminal_steps = unity_env.get_steps(behavior_name)
if len(terminal_steps)!=0:
#the agent is done
game_over=True
#获取环境信息,作为onnx的输入
a=list(np.array(decision_steps[0].obs[0]))#激光雷达数据
b=list(np.array(decision_steps[0].obs[1]))#终点在小车坐标系下的坐标
agentID=decision_steps[0].agent_id
observation=np.asarray(np.array([a+b]),dtype=np.float32)
actionMask=np.asarray(decision_steps[0].action_mask, dtype=np.float32)
actionMask = np.asarray(np.array([[1,1]]), dtype=np.float32)
# 运行onnx模型,模型有两个输入,
result = onnx_session.run([], {input_name0: observation, input_name1: actionMask})
if result[4][0][0]>result[4][0][1]:
discr=0
else:
discr=1
discr=[[discr]]
action_tuple=ActionTuple(np.asarray(result[2],dtype=np.float32),np.asarray(discr,dtype=np.int32))
#if(result[3][0][0]):
# conti_action = np.asarray(discr, dtype=np.int32)
# disc_action=np.asarray(result[2],dtype=np.float32)
# action_tuple=ActionTuple()
#
# action_tuple.add_continuous(conti_action)
# action_tuple.add_discrete(disc_action)
#unity_env.set_actions(behavior_name, action_tuple)
unity_env.set_action_for_agent(behavior_name,agentID,action_tuple)
episode_done=(terminal_steps.group_reward.shape[0]>0)
print(episode_done)
if episode_done:
break
unity_env.close()