freeswitch mod_event_socket高性能版本

颛孙昆
2023-12-01

近日研究了下mod_event_socket源码,发现socket用的是poll,并且接收到的时候没有用到缓冲,严重影响系统性能,有必要进行数据包的收发进行优化处理,保证通讯的畅通。

一、数据报字段过滤功能

<configuration name="event_socket.conf" description="Socket Client">
  <settings>
    <param name="nat-map" value="false"/>
    <param name="listen-ip" value="127.0.0.1"/>
    <param name="listen-port" value="8021"/>
    <param name="password" value="ClueCon"/>
    <!--<param name="apply-inbound-acl" value="lan"/>-->
    <param name="event-whitelists" value=""/> <!-- 事件白名单,全部字段写入 -->
		<param name="valid-variant-prefixs" value=""/> <!-- 字段前缀匹配白名单,字段写入 -->
		<param name="valid-variant-includes" value=""/> <!-- 字段包含匹配白名单,字段写入 -->
		<param name="invalid-variant-prefixs" value=""/> <!-- 字段前缀匹配黑名单,字段去除 -->
		<param name="invalid-variant-includes" value=""/> <!-- 字段包含匹配黑名单,字段去除 --> 
  </settings>
</configuration>

二、改poll为epoll收发数据包

源代码如下:

static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t seconds)
{
	char buf[65536] = "";
	switch_status_t status = SWITCH_STATUS_SUCCESS;
	int count = 0;
	uint32_t elapsed = 0;
	time_t start = 0;
	uint8_t do_sleep = 1;
	void *pop;
	switch_size_t buf_len = sizeof(buf);
	switch_channel_t *channel = NULL;

	*event = NULL;
	start = switch_epoch_time_now(NULL);

	if (prefs.done) {
		switch_goto_status(SWITCH_STATUS_FALSE, end);
	}

	if (listener->session) {
		channel = switch_core_session_get_channel(listener->session);
	}

	while (listener->sock && !prefs.done) {
		
		char *packet = NULL, *val = NULL;
		switch_size_t len, packet_len, clen = 0, total_len = 0;

		len = switch_buffer_ignore_space(listener->recv_buffer);
		if(len){

			packet_len = switch_buffer_peek_packet(listener->recv_buffer, &packet);
			if(packet_len){
				val = strstr(packet, "content-length:");
				if(val){
					val += strlen("content-length:");
					while (val && *val == ' ') {
						val++;
					}

					if(val){
						clen = atoi(val);
						total_len = packet_len + clen;
					}else{
						switch_buffer_toss(listener->recv_buffer, packet_len);
					}
				}else{
					total_len = packet_len;
				}

				if(total_len && switch_buffer_inuse(listener->recv_buffer) < total_len){
					total_len = 0;
				}

				switch_safe_free(packet);

			}else if(len >= 10485760){
				switch_buffer_toss(listener->recv_buffer, len);
			}
		}

		if(total_len == 0){
			int64_t timeout = 20000;

			if(!do_sleep){
				timeout = 0;
				switch_os_yield();
			}

			memset(buf, 0, sizeof(buf));
			status = listener_handle_recv(listener, buf, &buf_len, timeout);

			if (prefs.done || (status != SWITCH_STATUS_TIMEOUT && status != SWITCH_STATUS_SUCCESS)) {

				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "listener_handle_recv(%ld) return %d!\n", timeout, status);

				switch_goto_status(SWITCH_STATUS_FALSE, end);
			}

			if(status == SWITCH_STATUS_SUCCESS){
				do_sleep = 0;
				switch_buffer_write(listener->recv_buffer, buf, buf_len);
				continue;
			}

			do_sleep = 1;

			if (switch_test_flag(listener, LFLAG_LOG)) {
				if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) {
					switch_log_node_t *dnode = (switch_log_node_t *) pop;

					if (dnode->data) {
						char *data = NULL;
						len = strlen(dnode->data);

						data = switch_mprintf("Content-Type: log/data\n"
							"Content-Length: %" SWITCH_SSIZE_T_FMT "\n"
							"Log-Level: %d\n"
							"Text-Channel: %d\n"
							"Log-File: %s\n"
							"Log-Func: %s\n"
							"Log-Line: %d\n"
							"User-Data: %s\n"
							"\n%s",
							len,
							dnode->level, dnode->channel, dnode->file, dnode->func, dnode->line, 
							switch_str_nil(dnode->userdata), dnode->data
							);

						len = strlen(data);
						switch_socket_send(listener->sock, data, &len);
						switch_safe_free(data);
						do_sleep = 0;
					}
					switch_log_node_free(&dnode);
				}
			}


			if (listener->session) {
				switch_channel_t *chan = switch_core_session_get_channel(listener->session);
				if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) {
					switch_event_t *e = NULL;
					while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
						if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) {
							switch_core_session_queue_event(listener->session, &e);
							break;
						}
					}
				}
			}

			if (switch_test_flag(listener, LFLAG_EVENTS)) {
				while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
					switch_event_t *pevent = (switch_event_t *) pop;
					char *etype, *data = NULL;

					if (listener->format == EVENT_FORMAT_PLAIN) {
						etype = "plain";
						switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
					} else if (listener->format == EVENT_FORMAT_JSON) {
						etype = "json";
						switch_event_serialize_json(pevent, &listener->ebuf);
					} else {
						switch_xml_t xml;
						etype = "xml";

						if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) {
							listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
							switch_xml_free(xml);
						} else {
							switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n");
							goto endloop;
						}
					}

					switch_assert(listener->ebuf);

					len = strlen(listener->ebuf);

					data = switch_mprintf("Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n%s", len, etype, listener->ebuf);

					len = strlen(data);
					switch_socket_send(listener->sock, data, &len);

					switch_safe_free(listener->ebuf);
					switch_safe_free(data);
					do_sleep = 0;
endloop:

					switch_event_destroy(&pevent);
				}
			}

		}else{
			char *mbuf = (char *)calloc(1, total_len + 1);
			char *next;
			char *cur = mbuf;

			do_sleep = 0;
			switch_buffer_read(listener->recv_buffer, mbuf, total_len);


			while (cur) {
				if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) {
					while (*next == '\r' || *next == '\n') {
						*next = '\0';
						next++;
					}
				}

				count++;

				if (count == 1) {
					switch_event_create(event, SWITCH_EVENT_CLONE);
					switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", cur);
				} else if (cur) {

					char *var, *val;
					var = cur;
					strip_cr(var);

					if (!zstr(var)) {
						if ((val = strchr(var, ':'))) {
							*val++ = '\0';
							while (*val == ' ') {
								val++;
							}
						}
						if (var && val) {
							switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val);
							if (!strcasecmp(var, "content-length")) {
								clen = atoi(val);
								
								if (clen > 0 && next) {
									cur = next;
									switch_event_add_body(*event, "%s", cur);
								}
							}
						}
					}
				}

				cur = next;
			}

			free(mbuf);
			break;
		}

		

		if (seconds) {
			elapsed = (uint32_t) (switch_epoch_time_now(NULL) - start);
			if (elapsed >= seconds) {
				switch_clear_flag_locked(listener, LFLAG_RUNNING);
				switch_goto_status(SWITCH_STATUS_FALSE, end);
			}
		}

		if (switch_test_flag(listener, LFLAG_HANDLE_DISCO) && 
			listener->linger_timeout != (time_t) -1 && switch_epoch_time_now(NULL) > listener->linger_timeout) {
			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_DEBUG, "linger timeout, closing socket\n");
			status = SWITCH_STATUS_FALSE;
			break;
		}

		if (channel && switch_channel_down(channel) && !switch_test_flag(listener, LFLAG_HANDLE_DISCO)) {
			switch_set_flag_locked(listener, LFLAG_HANDLE_DISCO);
			if (switch_test_flag(listener, LFLAG_LINGER)) {
				char *data = NULL;
				
				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_DEBUG, "%s Socket Linger %d\n", 
								  switch_channel_get_name(channel), (int)listener->linger_timeout);
				
				data = switch_mprintf("Content-Type: text/disconnect-notice\n"
								"Controlled-Session-UUID: %s\n"
								"Content-Disposition: linger\n" 
								"Channel-Name: %s\n"
								"Linger-Time: %d\n"
								"Content-Length: 0\n\n", 
								switch_core_session_get_uuid(listener->session), switch_channel_get_name(channel), (int)listener->linger_timeout);


				if (listener->linger_timeout != (time_t) -1) {
					listener->linger_timeout += switch_epoch_time_now(NULL);
				}
				
				len = strlen(data);
				switch_socket_send(listener->sock, data, &len);
				switch_safe_free(data);
			} else {
				status = SWITCH_STATUS_FALSE;
				break;
			}
		}
	}

 end:
	return status;

}

三、接收数据添加缓冲,解决多次接收效率慢的问题;

 类似资料: