本文以Recog Engine引擎为例,分析unimrcpserver的MRCP消息处理流程。
进程初始化时,MRCPv2-Agent-1启动一个监听的task线程,task函数是apt_poller_task_run()。
mrcp_server_connection_agent_create()这个函数设置了消息处理入口即mrcp_server_agent_msg_process()。
/** Create connection agent */
MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_server_connection_agent_create(
const char *id,
const char *listen_ip,
apr_port_t listen_port,
apr_size_t max_connection_count,
apt_bool_t force_new_connection,
apr_pool_t *pool)
{
apt_task_t *task;
apt_task_vtable_t *vtable;
apt_task_msg_pool_t *msg_pool;
mrcp_connection_agent_t *agent;
if(!listen_ip) {
return NULL;
}
apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] %s:%hu [%"APR_SIZE_T_FMT"]",
id,listen_ip,listen_port,max_connection_count);
agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
agent->pool = pool;
agent->sockaddr = NULL;
agent->listen_sock = NULL;
agent->force_new_connection = force_new_connection;
agent->max_shared_use_count = 100;
agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
agent->inactivity_timeout = 600000; /* 10 min */
agent->termination_timeout = 3000; /* 3 sec */
apr_sockaddr_info_get(&agent->sockaddr,listen_ip,APR_INET,listen_port,0,pool);
if(!agent->sockaddr) {
return NULL;
}
msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
agent->task = apt_poller_task_create(
max_connection_count + 1,
mrcp_server_poller_signal_process,
agent,
msg_pool,
pool);
if(!agent->task) {
return NULL;
}
task = apt_poller_task_base_get(agent->task);
if(task) {
apt_task_name_set(task,id);
}
vtable = apt_poller_task_vtable_get(agent->task);
if(vtable) {
vtable->destroy = mrcp_server_agent_on_destroy;
vtable->process_msg = mrcp_server_agent_msg_process;
}
APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
agent->pending_channel_table = apr_hash_make(pool);
if(mrcp_server_agent_listening_socket_create(agent) != TRUE) {
apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Listening Socket [%s] %s:%hu",
id,
listen_ip,
listen_port);
}
return agent;
}
接收处理SIP INVITE消息时,分发到mrcp_server_agent_msg_process()。
/* Process task message */
static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
{
apt_poller_task_t *poller_task = apt_task_object_get(task);
mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
switch(msg->type) {
case CONNECTION_TASK_MSG_ADD_CHANNEL:
mrcp_server_agent_channel_add(agent,msg->channel,msg->descriptor);
break;
case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
mrcp_server_agent_channel_modify(agent,msg->channel,msg->descriptor);
break;
case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
mrcp_server_agent_channel_remove(agent,msg->channel);
break;
case CONNECTION_TASK_MSG_SEND_MESSAGE:
mrcp_server_agent_messsage_send(agent,msg->channel->connection,msg->message);
break;
}
return TRUE;
}
走的是CONNECTION_TASK_MSG_ADD_CHANNEL这个分支,调用mrcp_server_agent_channel_add(),添加一个Pending Control Channel。
如果之前Client已经和Server建立过连接,是可以复用连接的。这里描述的是连接初始化的过程。
监听线程函数apt_poller_task_run()接收连接信号,调用信号处理回调:
for(task->desc_index = 0; task->desc_index < task->desc_count; task->desc_index++) {
const apr_pollfd_t *descriptor = &task->desc_arr[task->desc_index];
if(apt_pollset_is_wakeup(task->pollset,descriptor)) {
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Poller Wakeup [%s]",task_name);
apt_poller_task_wakeup_process(task);
if(*running == FALSE) {
break;
}
continue;
}
apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Signalled Descriptor [%s]",task_name);
task->signal_handler(task->obj,descriptor);
}
这里的回调函数,指向mrcp_server_poller_signal_process(),调用mrcp_server_agent_connection_accept()接受连接。
/* Receive MRCP message through TCP/MRCPv2 connection */
static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
{
mrcp_connection_agent_t *agent = obj;
mrcp_connection_t *connection = descriptor->client_data;
apr_status_t status;
apr_size_t offset;
apr_size_t length;
apt_text_stream_t *stream;
mrcp_message_t *message;
apt_message_status_e msg_status;
if(descriptor->desc.s == agent->listen_sock) {
return mrcp_server_agent_connection_accept(agent);
}
if(!connection || !connection->sock) {
return FALSE;
}
........
接收MRCP报文后,还是分发给mrcp_server_poller_signal_process()处理,只不过调用的是mrcp_server_message_handler()。
do {
msg_status = mrcp_parser_run(connection->parser,stream,&message);
if(mrcp_server_message_handler(connection,message,msg_status) == FALSE) {
return FALSE;
}
}
mrcp_server_message_handler()里调用mrcp_connection_channel_associate(),关联到mrcp_control_channel_t对象上。然后发送信号。
static apt_bool_t mrcp_server_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
{
mrcp_connection_agent_t *agent = connection->agent;
if(status == APT_MESSAGE_STATUS_COMPLETE) {
/* message is completely parsed */
mrcp_control_channel_t *channel = mrcp_connection_channel_associate(agent,connection,message);
if(channel) {
/* (re)set inactivity timer on every message received */
if(connection->inactivity_timer) {
apt_timer_set(connection->inactivity_timer,agent->inactivity_timeout);
}
mrcp_connection_message_receive(agent->vtable,channel,message);
}
else {
apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel " APT_SIDRES_FMT " in Connection %s",
MRCP_MESSAGE_SIDRES(message),
connection->id);
}
}
接下来mrcp_server_session的task线程捕获信号消息并进行分发。由mrcp_recog_state_machine接收并分发给Recog Engine。Recog Engine继续分发给自己的consumer_task。consumer_task处理后返回response给mrcp_recog_state_machine,由状态机驱动函数recog_state_update()调用recog_response_state_update()发出消息,最终由传输层捕获消息,调用mrcp_server_agent_messsage_send()把报文发回给Client