近日研究了下mod_event_socket源码,发现socket用的是poll,并且接收到的时候没有用到缓冲,严重影响系统性能,有必要进行数据包的收发进行优化处理,保证通讯的畅通。
一、数据报字段过滤功能
<configuration name="event_socket.conf" description="Socket Client">
<settings>
<param name="nat-map" value="false"/>
<param name="listen-ip" value="127.0.0.1"/>
<param name="listen-port" value="8021"/>
<param name="password" value="ClueCon"/>
<!--<param name="apply-inbound-acl" value="lan"/>-->
<param name="event-whitelists" value=""/> <!-- 事件白名单,全部字段写入 -->
<param name="valid-variant-prefixs" value=""/> <!-- 字段前缀匹配白名单,字段写入 -->
<param name="valid-variant-includes" value=""/> <!-- 字段包含匹配白名单,字段写入 -->
<param name="invalid-variant-prefixs" value=""/> <!-- 字段前缀匹配黑名单,字段去除 -->
<param name="invalid-variant-includes" value=""/> <!-- 字段包含匹配黑名单,字段去除 -->
</settings>
</configuration>
二、改poll为epoll收发数据包
源代码如下:
static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t seconds)
{
char buf[65536] = "";
switch_status_t status = SWITCH_STATUS_SUCCESS;
int count = 0;
uint32_t elapsed = 0;
time_t start = 0;
uint8_t do_sleep = 1;
void *pop;
switch_size_t buf_len = sizeof(buf);
switch_channel_t *channel = NULL;
*event = NULL;
start = switch_epoch_time_now(NULL);
if (prefs.done) {
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
if (listener->session) {
channel = switch_core_session_get_channel(listener->session);
}
while (listener->sock && !prefs.done) {
char *packet = NULL, *val = NULL;
switch_size_t len, packet_len, clen = 0, total_len = 0;
len = switch_buffer_ignore_space(listener->recv_buffer);
if(len){
packet_len = switch_buffer_peek_packet(listener->recv_buffer, &packet);
if(packet_len){
val = strstr(packet, "content-length:");
if(val){
val += strlen("content-length:");
while (val && *val == ' ') {
val++;
}
if(val){
clen = atoi(val);
total_len = packet_len + clen;
}else{
switch_buffer_toss(listener->recv_buffer, packet_len);
}
}else{
total_len = packet_len;
}
if(total_len && switch_buffer_inuse(listener->recv_buffer) < total_len){
total_len = 0;
}
switch_safe_free(packet);
}else if(len >= 10485760){
switch_buffer_toss(listener->recv_buffer, len);
}
}
if(total_len == 0){
int64_t timeout = 20000;
if(!do_sleep){
timeout = 0;
switch_os_yield();
}
memset(buf, 0, sizeof(buf));
status = listener_handle_recv(listener, buf, &buf_len, timeout);
if (prefs.done || (status != SWITCH_STATUS_TIMEOUT && status != SWITCH_STATUS_SUCCESS)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "listener_handle_recv(%ld) return %d!\n", timeout, status);
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
if(status == SWITCH_STATUS_SUCCESS){
do_sleep = 0;
switch_buffer_write(listener->recv_buffer, buf, buf_len);
continue;
}
do_sleep = 1;
if (switch_test_flag(listener, LFLAG_LOG)) {
if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_log_node_t *dnode = (switch_log_node_t *) pop;
if (dnode->data) {
char *data = NULL;
len = strlen(dnode->data);
data = switch_mprintf("Content-Type: log/data\n"
"Content-Length: %" SWITCH_SSIZE_T_FMT "\n"
"Log-Level: %d\n"
"Text-Channel: %d\n"
"Log-File: %s\n"
"Log-Func: %s\n"
"Log-Line: %d\n"
"User-Data: %s\n"
"\n%s",
len,
dnode->level, dnode->channel, dnode->file, dnode->func, dnode->line,
switch_str_nil(dnode->userdata), dnode->data
);
len = strlen(data);
switch_socket_send(listener->sock, data, &len);
switch_safe_free(data);
do_sleep = 0;
}
switch_log_node_free(&dnode);
}
}
if (listener->session) {
switch_channel_t *chan = switch_core_session_get_channel(listener->session);
if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) {
switch_event_t *e = NULL;
while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) {
switch_core_session_queue_event(listener->session, &e);
break;
}
}
}
}
if (switch_test_flag(listener, LFLAG_EVENTS)) {
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_event_t *pevent = (switch_event_t *) pop;
char *etype, *data = NULL;
if (listener->format == EVENT_FORMAT_PLAIN) {
etype = "plain";
switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
} else if (listener->format == EVENT_FORMAT_JSON) {
etype = "json";
switch_event_serialize_json(pevent, &listener->ebuf);
} else {
switch_xml_t xml;
etype = "xml";
if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) {
listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
switch_xml_free(xml);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n");
goto endloop;
}
}
switch_assert(listener->ebuf);
len = strlen(listener->ebuf);
data = switch_mprintf("Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n%s", len, etype, listener->ebuf);
len = strlen(data);
switch_socket_send(listener->sock, data, &len);
switch_safe_free(listener->ebuf);
switch_safe_free(data);
do_sleep = 0;
endloop:
switch_event_destroy(&pevent);
}
}
}else{
char *mbuf = (char *)calloc(1, total_len + 1);
char *next;
char *cur = mbuf;
do_sleep = 0;
switch_buffer_read(listener->recv_buffer, mbuf, total_len);
while (cur) {
if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) {
while (*next == '\r' || *next == '\n') {
*next = '\0';
next++;
}
}
count++;
if (count == 1) {
switch_event_create(event, SWITCH_EVENT_CLONE);
switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", cur);
} else if (cur) {
char *var, *val;
var = cur;
strip_cr(var);
if (!zstr(var)) {
if ((val = strchr(var, ':'))) {
*val++ = '\0';
while (*val == ' ') {
val++;
}
}
if (var && val) {
switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val);
if (!strcasecmp(var, "content-length")) {
clen = atoi(val);
if (clen > 0 && next) {
cur = next;
switch_event_add_body(*event, "%s", cur);
}
}
}
}
}
cur = next;
}
free(mbuf);
break;
}
if (seconds) {
elapsed = (uint32_t) (switch_epoch_time_now(NULL) - start);
if (elapsed >= seconds) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
}
if (switch_test_flag(listener, LFLAG_HANDLE_DISCO) &&
listener->linger_timeout != (time_t) -1 && switch_epoch_time_now(NULL) > listener->linger_timeout) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_DEBUG, "linger timeout, closing socket\n");
status = SWITCH_STATUS_FALSE;
break;
}
if (channel && switch_channel_down(channel) && !switch_test_flag(listener, LFLAG_HANDLE_DISCO)) {
switch_set_flag_locked(listener, LFLAG_HANDLE_DISCO);
if (switch_test_flag(listener, LFLAG_LINGER)) {
char *data = NULL;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_DEBUG, "%s Socket Linger %d\n",
switch_channel_get_name(channel), (int)listener->linger_timeout);
data = switch_mprintf("Content-Type: text/disconnect-notice\n"
"Controlled-Session-UUID: %s\n"
"Content-Disposition: linger\n"
"Channel-Name: %s\n"
"Linger-Time: %d\n"
"Content-Length: 0\n\n",
switch_core_session_get_uuid(listener->session), switch_channel_get_name(channel), (int)listener->linger_timeout);
if (listener->linger_timeout != (time_t) -1) {
listener->linger_timeout += switch_epoch_time_now(NULL);
}
len = strlen(data);
switch_socket_send(listener->sock, data, &len);
switch_safe_free(data);
} else {
status = SWITCH_STATUS_FALSE;
break;
}
}
}
end:
return status;
}
三、接收数据添加缓冲,解决多次接收效率慢的问题;