当前位置: 首页 > 工具软件 > UniMRCP > 使用案例 >

unimrcpserver的MRCP消息处理

谷梁鸣
2023-12-01

      本文以Recog Engine引擎为例,分析unimrcpserver的MRCP消息处理流程。

初始化

        进程初始化时,MRCPv2-Agent-1启动一个监听的task线程,task函数是apt_poller_task_run()。

        mrcp_server_connection_agent_create()这个函数设置了消息处理入口即mrcp_server_agent_msg_process()。

/** Create connection agent */
MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_server_connection_agent_create(
										const char *id,
										const char *listen_ip,
										apr_port_t listen_port,
										apr_size_t max_connection_count,
										apt_bool_t force_new_connection,
										apr_pool_t *pool)
{
	apt_task_t *task;
	apt_task_vtable_t *vtable;
	apt_task_msg_pool_t *msg_pool;
	mrcp_connection_agent_t *agent;

	if(!listen_ip) {
		return NULL;
	}
	
	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] %s:%hu [%"APR_SIZE_T_FMT"]",
		id,listen_ip,listen_port,max_connection_count);
	agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
	agent->pool = pool;
	agent->sockaddr = NULL;
	agent->listen_sock = NULL;
	agent->force_new_connection = force_new_connection;
	agent->max_shared_use_count = 100;
	agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
	agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
	agent->inactivity_timeout = 600000; /* 10 min */
	agent->termination_timeout = 3000; /* 3 sec */

	apr_sockaddr_info_get(&agent->sockaddr,listen_ip,APR_INET,listen_port,0,pool);
	if(!agent->sockaddr) {
		return NULL;
	}

	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
	
	agent->task = apt_poller_task_create(
					max_connection_count + 1,
					mrcp_server_poller_signal_process,
					agent,
					msg_pool,
					pool);
	if(!agent->task) {
		return NULL;
	}

	task = apt_poller_task_base_get(agent->task);
	if(task) {
		apt_task_name_set(task,id);
	}

	vtable = apt_poller_task_vtable_get(agent->task);
	if(vtable) {
		vtable->destroy = mrcp_server_agent_on_destroy;
		vtable->process_msg = mrcp_server_agent_msg_process;
	}

	APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
	agent->pending_channel_table = apr_hash_make(pool);

	if(mrcp_server_agent_listening_socket_create(agent) != TRUE) {
		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Listening Socket [%s] %s:%hu", 
				id,
				listen_ip,
				listen_port);
	}
	return agent;
}

SIP Session

      接收处理SIP INVITE消息时,分发到mrcp_server_agent_msg_process()。

/* Process task message */
static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
{
	apt_poller_task_t *poller_task = apt_task_object_get(task);
	mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
	connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
	switch(msg->type) {
		case CONNECTION_TASK_MSG_ADD_CHANNEL:
			mrcp_server_agent_channel_add(agent,msg->channel,msg->descriptor);
			break;
		case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
			mrcp_server_agent_channel_modify(agent,msg->channel,msg->descriptor);
			break;
		case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
			mrcp_server_agent_channel_remove(agent,msg->channel);
			break;
		case CONNECTION_TASK_MSG_SEND_MESSAGE:
			mrcp_server_agent_messsage_send(agent,msg->channel->connection,msg->message);
			break;
	}

	return TRUE;
}

走的是CONNECTION_TASK_MSG_ADD_CHANNEL这个分支,调用mrcp_server_agent_channel_add(),添加一个Pending Control Channel。

MRCP连接建立

        如果之前Client已经和Server建立过连接,是可以复用连接的。这里描述的是连接初始化的过程。

        监听线程函数apt_poller_task_run()接收连接信号,调用信号处理回调:

	for(task->desc_index = 0; task->desc_index < task->desc_count; task->desc_index++) {
			const apr_pollfd_t *descriptor = &task->desc_arr[task->desc_index];
			if(apt_pollset_is_wakeup(task->pollset,descriptor)) {
				apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Poller Wakeup [%s]",task_name);
				apt_poller_task_wakeup_process(task);
				if(*running == FALSE) {
					break;
				}
				continue;
			}

			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Signalled Descriptor [%s]",task_name);
			task->signal_handler(task->obj,descriptor);
		}

这里的回调函数,指向mrcp_server_poller_signal_process(),调用mrcp_server_agent_connection_accept()接受连接。

/* Receive MRCP message through TCP/MRCPv2 connection */
static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
{
	mrcp_connection_agent_t *agent = obj;
	mrcp_connection_t *connection = descriptor->client_data;
	apr_status_t status;
	apr_size_t offset;
	apr_size_t length;
	apt_text_stream_t *stream;
	mrcp_message_t *message;
	apt_message_status_e msg_status;

	if(descriptor->desc.s == agent->listen_sock) {
		return mrcp_server_agent_connection_accept(agent);
	}

	if(!connection || !connection->sock) {
		return FALSE;
	}
........

接收MRCP报文后,还是分发给mrcp_server_poller_signal_process()处理,只不过调用的是mrcp_server_message_handler()。

	do {
		msg_status = mrcp_parser_run(connection->parser,stream,&message);
		if(mrcp_server_message_handler(connection,message,msg_status) == FALSE) {
			return FALSE;
		}
	}



mrcp_server_message_handler()里调用mrcp_connection_channel_associate(),关联到mrcp_control_channel_t对象上。然后发送信号。

static apt_bool_t mrcp_server_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
{
	mrcp_connection_agent_t *agent = connection->agent;
	if(status == APT_MESSAGE_STATUS_COMPLETE) {
		/* message is completely parsed */
		mrcp_control_channel_t *channel = mrcp_connection_channel_associate(agent,connection,message);
		if(channel) {
			/* (re)set inactivity timer on every message received */
			if(connection->inactivity_timer) {
				apt_timer_set(connection->inactivity_timer,agent->inactivity_timeout);
			}

			mrcp_connection_message_receive(agent->vtable,channel,message);
		}
		else {
			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel " APT_SIDRES_FMT " in Connection %s",
				MRCP_MESSAGE_SIDRES(message),
				connection->id);
		}
	}

接下来mrcp_server_session的task线程捕获信号消息并进行分发。由mrcp_recog_state_machine接收并分发给Recog Engine。Recog Engine继续分发给自己的consumer_task。consumer_task处理后返回response给mrcp_recog_state_machine,由状态机驱动函数recog_state_update()调用recog_response_state_update()发出消息,最终由传输层捕获消息,调用mrcp_server_agent_messsage_send()把报文发回给Client

 类似资料: