当前位置: 首页 > 工具软件 > coturn > 使用案例 >

webrtc入门:8.coturn流程

养星汉
2023-12-01

对于局域网中的webrtc是不需要coturn,因为他们自己进行视频流的传输,但通常的应用场景,在同一个局域网下概率是比较低的,因此我们需要把流推到服务器上,另外一端在从服务器把流拉下来。coturn做为turnserver做的是这一部分的工作。当然coturn也可以作为 stun服务器,这篇文章先不讲,只讲turnserver这部分的内容。

turnserver的原理是根据webrtc的协议而定的,最主要的功能就是转发webrtc上的数据流,在转发数据流中,最关键的是两样,一个是创建流端口连接,一个是匹配端口。

当webrtc的客户端配置上了ICE_SERVER 以后,客户端会去ICE_SERVER 请求一个端口用来传递数据流,这就是所谓的ice,在互换了ice以后,会选出一个通路来匹配到另一端的通路,数据在服务端中进行转发传递。

这也就是最重要的两个环节,创建流通路和匹配通道。

1.创建流通路

webrtc的客户端向服务器发起连接,通过命令告诉服务器他需要的东西。

static int handle_turn_command(turn_turnserver *server, ts_ur_super_session *ss, ioa_net_data *in_buffer, ioa_network_buffer_handle nbh, int *resp_constructed, int can_resume)
{

	...

		if (!err_code && !(*resp_constructed) && !no_response) {

			switch (method){

			case STUN_METHOD_ALLOCATE:

			{
				handle_turn_allocate(server, ss, &tid, resp_constructed, &err_code, &reason,
							unknown_attrs, &ua_num, in_buffer, nbh);

				if(server->verbose) {
				  log_method(ss, "ALLOCATE", err_code, reason);
				}

				break;
			}

			...
	return 0;
}

在对 turn 的命令进行处理时,STUN_METHOD_ALLOCATE告诉服务器,需要分配一个新的通道。

static int handle_turn_allocate(turn_turnserver *server,
				ts_ur_super_session *ss, stun_tid *tid, int *resp_constructed,
				int *err_code, 	const u08bits **reason, u16bits *unknown_attrs, u16bits *ua_num,
				ioa_net_data *in_buffer, ioa_network_buffer_handle nbh) {


	...

				if(!(*err_code)) {
					if(!af4 && !af6) {
						int af4res = create_relay_connection(server, ss, lifetime,
							STUN_ATTRIBUTE_REQUESTED_ADDRESS_FAMILY_VALUE_DEFAULT, transport,
							even_port, in_reservation_token, &out_reservation_token,
							err_code, reason,
							tcp_peer_accept_connection);
						if(af4res<0) {
							set_relay_session_failure(alloc,AF_INET);
							if(!(*err_code)) {
								*err_code = 437;
							}
						}
					} else if(!af4 && af6) {
						int af6res = create_relay_connection(server, ss, lifetime,
							af6, transport,
							even_port, in_reservation_token, &out_reservation_token,
							err_code, reason,
							tcp_peer_accept_connection);
						if(af6res<0) {
							set_relay_session_failure(alloc,AF_INET6);
							if(!(*err_code)) {
								*err_code = 437;
							}
						}
					} else if(af4 && !af6) {
						int af4res = create_relay_connection(server, ss, lifetime,
							af4, transport,
							even_port, in_reservation_token, &out_reservation_token,
							err_code, reason,
							tcp_peer_accept_connection);
						if(af4res<0) {
							set_relay_session_failure(alloc,AF_INET);
							if(!(*err_code)) {
								*err_code = 437;
							}
						}
					} else {
						const u08bits *reason4 = NULL;
						const u08bits *reason6 = NULL;
						{
							int af4res = create_relay_connection(server, ss, lifetime,
									af4, transport,
									even_port, in_reservation_token, &out_reservation_token,
									&err_code4, &reason4,
									tcp_peer_accept_connection);
							if(af4res<0) {
								set_relay_session_failure(alloc,AF_INET);
								if(!err_code4) {
									err_code4 = 440;
								}
							}
						}
						{
							int af6res = create_relay_connection(server, ss, lifetime,
												af6, transport,
												even_port, in_reservation_token, &out_reservation_token,
												&err_code6, &reason6,
												tcp_peer_accept_connection);
							if(af6res<0) {
								set_relay_session_failure(alloc,AF_INET6);
								if(!err_code6) {
									err_code6 = 440;
								}
							}
						}

						if(err_code4 && err_code6) {
							if(reason4) {
								*err_code = err_code4;
								*reason = reason4;
							} else if(reason6) {
								*err_code = err_code6;
								*reason = reason6;
							} else {
								*err_code = err_code4;
							}
						}
					}
				}

				...

	return 0;
}

create_relay_connection 用来创建 relay的连接,具体在看看create_relay_connection 的内容。

static int create_relay_connection(turn_turnserver* server,
				   ts_ur_super_session *ss, u32bits lifetime,
				   int address_family, u08bits transport,
				   int even_port, u64bits in_reservation_token, u64bits *out_reservation_token,
				   int *err_code, const u08bits **reason,
				   accept_cb acb) {

	... else {

			newelem = get_relay_session_ss(ss,get_family(address_family));

			IOA_CLOSE_SOCKET(newelem->s);

			ns_bzero(newelem, sizeof(relay_endpoint_session));
			newelem->s = NULL;

			int res = create_relay_ioa_sockets(server->e,
							ss->client_socket,
							address_family, transport,
							even_port, &(newelem->s), &rtcp_s, out_reservation_token,
							err_code, reason, acb, ss);
			if (res < 0) {
				if(!(*err_code))
					*err_code = 508;
				if(!(*reason))
					*reason = (const u08bits *)"Cannot create socket";
				IOA_CLOSE_SOCKET(newelem->s);
				IOA_CLOSE_SOCKET(rtcp_s);
				return -1;
			}
		}

		...
		
		/* RFC6156: do not use DF when IPv6 is involved: */
		if((get_ioa_socket_address_family(newelem->s) == AF_INET6) ||
		   (get_ioa_socket_address_family(ss->client_socket) == AF_INET6))
			set_do_not_use_df(newelem->s);

		if(get_ioa_socket_type(newelem->s) != TCP_SOCKET) {
			if(register_callback_on_ioa_socket(server->e, newelem->s, IOA_EV_READ,peer_input_handler, ss, 0)<0) {
				return -1;
			}
		}

	return 0;
}

关键的代码在 create_relay_ioa_sockets 下,现在sockerts的字眼已经出现了,建立完成以后,设置了peer_input_handler的匹配回调接口。

int create_relay_ioa_sockets(ioa_engine_handle e,
				ioa_socket_handle client_s,
				int address_family, u08bits transport,
				int even_port, ioa_socket_handle *rtp_s,
				ioa_socket_handle *rtcp_s, uint64_t *out_reservation_token,
				int *err_code, const u08bits **reason,
				accept_cb acb, void *acbarg)
{

	...

	for (iip = 0; iip < e->relays_number; ++iip) {

		...
		addr_cpy(&rtcp_local_addr, &relay_addr);

		int i = 0;
		int port = 0;
		ioa_addr local_addr;
		addr_cpy(&local_addr, &relay_addr);
		for (i = 0; i < 0xFFFF; i++) {
			port = 0;
			rtcp_port = -1;
			if (even_port < 0) {
				port = turnipports_allocate(tp, transport, &relay_addr);
			} else {

				port = turnipports_allocate_even(tp, &relay_addr, even_port, out_reservation_token);
				if (port >= 0 && even_port > 0) {

					IOA_CLOSE_SOCKET(*rtcp_s);
					*rtcp_s = create_unbound_relay_ioa_socket(e, relay_addr.ss.sa_family, UDP_SOCKET, RELAY_RTCP_SOCKET);
					if (*rtcp_s == NULL) {
						perror("socket");
						IOA_CLOSE_SOCKET(*rtp_s);
						addr_set_port(&local_addr, port);
						turnipports_release(tp, transport, &local_addr);
						rtcp_port = port + 1;
						addr_set_port(&rtcp_local_addr, rtcp_port);
						turnipports_release(tp, transport, &rtcp_local_addr);
						return -1;
					}
					sock_bind_to_device((*rtcp_s)->fd, (unsigned char*)e->relay_ifname);

					rtcp_port = port + 1;
					addr_set_port(&rtcp_local_addr, rtcp_port);
					if (bind_ioa_socket(*rtcp_s, &rtcp_local_addr,
						(transport == STUN_ATTRIBUTE_TRANSPORT_TCP_VALUE)) < 0) {
						addr_set_port(&local_addr, port);
						turnipports_release(tp, transport, &local_addr);
						turnipports_release(tp, transport, &rtcp_local_addr);
						rtcp_port = -1;
						IOA_CLOSE_SOCKET(*rtcp_s);
						continue;
					}
				}
			}
			if (port < 0) {
				IOA_CLOSE_SOCKET(*rtp_s);
				if (rtcp_s)
					IOA_CLOSE_SOCKET(*rtcp_s);
				rtcp_port = -1;
				break;
			} else {

				IOA_CLOSE_SOCKET(*rtp_s);

				*rtp_s = create_unbound_relay_ioa_socket(e, relay_addr.ss.sa_family,
										(transport == STUN_ATTRIBUTE_TRANSPORT_TCP_VALUE) ? TCP_SOCKET : UDP_SOCKET,
										RELAY_SOCKET);
				if (*rtp_s == NULL) {
					int rtcp_bound = 0;
					if (rtcp_s && *rtcp_s) {
						rtcp_bound = (*rtcp_s)->bound;
						IOA_CLOSE_SOCKET(*rtcp_s);
					}
					addr_set_port(&local_addr, port);
					turnipports_release(tp, transport, &local_addr);
					if (rtcp_port >= 0 && !rtcp_bound) {
						addr_set_port(&rtcp_local_addr, rtcp_port);
						turnipports_release(tp, transport, &rtcp_local_addr);
					}
					perror("socket");
					return -1;
				}

				sock_bind_to_device((*rtp_s)->fd, (unsigned char*)e->relay_ifname);

				addr_set_port(&local_addr, port);
				if (bind_ioa_socket(*rtp_s, &local_addr,
					(transport == STUN_ATTRIBUTE_TRANSPORT_TCP_VALUE)) >= 0) {
					break;
				} else {
					IOA_CLOSE_SOCKET(*rtp_s);
					int rtcp_bound = 0;
					if (rtcp_s && *rtcp_s) {
						rtcp_bound = (*rtcp_s)->bound;
						IOA_CLOSE_SOCKET(*rtcp_s);
					}
					addr_set_port(&local_addr, port);
					turnipports_release(tp, transport, &local_addr);
					if (rtcp_port >= 0 && !rtcp_bound) {
						addr_set_port(&rtcp_local_addr, rtcp_port);
						turnipports_release(tp, transport, &rtcp_local_addr);
					}
					rtcp_port = -1;
				}
			}
		}


	}

	if (!(*rtp_s)) {
		TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "%s: no available ports 3\n", __FUNCTION__);
		IOA_CLOSE_SOCKET(*rtp_s);
		if (rtcp_s)
			IOA_CLOSE_SOCKET(*rtcp_s);
		return -1;
	}

	set_accept_cb(*rtp_s, acb, acbarg);

	if (rtcp_s && *rtcp_s && out_reservation_token && *out_reservation_token) {
		if (rtcp_map_put(e->map_rtcp, *out_reservation_token, *rtcp_s) < 0) {
			TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "%s: cannot update RTCP map\n", __FUNCTION__);
			IOA_CLOSE_SOCKET(*rtp_s);
			if (rtcp_s)
				IOA_CLOSE_SOCKET(*rtcp_s);
			return -1;
		}
	}
	...

	return 0;
}

这样就创建了一个通道,打印的日志如下,

1584: IPv4. Local relay addr: 117.25.xxx.xxx:64425

如果把这个数据给到webrtc的客户端呢?也是通过stun的协议,发到请求的客户端上。

2.匹配通道

两个客户端都建立了连接,在交换信息的时候,我们需要知道谁跟谁是配对的,这样我们可以把流发到对方上,问题还是的 从handle_turn_command看起,handle_turn_command接收到了STUN_METHOD_CHANNEL_BIND 绑定通道的命令,这个命令下会获取匹配的信息,下面是具体的流程分析。

static int handle_turn_command(turn_turnserver *server, ts_ur_super_session *ss, ioa_net_data *in_buffer, ioa_network_buffer_handle nbh, int *resp_constructed, int can_resume)
{

	...

			case STUN_METHOD_CHANNEL_BIND:

				handle_turn_channel_bind(server, ss, &tid, resp_constructed, &err_code, &reason,
								unknown_attrs, &ua_num, in_buffer, nbh);

				if(server->verbose) {
				  log_method(ss, "CHANNEL_BIND", err_code, reason);
				}
				break;

			case STUN_METHOD_CREATE_PERMISSION:

				handle_turn_create_permission(server, ss, &tid, resp_constructed, &err_code, &reason,
								unknown_attrs, &ua_num, in_buffer, nbh);

				if(server->verbose) {
				  log_method(ss, "CREATE_PERMISSION", err_code, reason);
				}
				break;

			...

	return 0;
}

接着继续看看handle_turn_channel_bind后面做了什么 事情。

static int handle_turn_channel_bind(turn_turnserver *server,
				    ts_ur_super_session *ss, stun_tid *tid, int *resp_constructed,
				    int *err_code, const u08bits **reason, u16bits *unknown_attrs, u16bits *ua_num,
				    ioa_net_data *in_buffer, ioa_network_buffer_handle nbh) {

	...
			case STUN_ATTRIBUTE_XOR_PEER_ADDRESS:
			  {
				stun_attr_get_addr_str(ioa_network_buffer_data(in_buffer->nbh), 
						       ioa_network_buffer_get_size(in_buffer->nbh), 
						       sar, &peer_addr,
						       NULL);

				if(!get_relay_socket(a,peer_addr.ss.sa_family)) {
					*err_code = 443;
					*reason = (const u08bits *)"Peer Address Family Mismatch (3)";
				}

				if(addr_get_port(&peer_addr) < 1) {
					*err_code = 400;
					*reason = (const u08bits *)"Empty port number in channel bind request";
				} else {
					addr_found = 1;
				}

				break;
			  }
			default:
				if(attr_type>=0x0000 && attr_type<=0x7FFF)
					unknown_attrs[(*ua_num)++] = nswap16(attr_type);
			};
			sar = stun_attr_get_next_str(ioa_network_buffer_data(in_buffer->nbh), 
						     ioa_network_buffer_get_size(in_buffer->nbh), 
						     sar);
		}

		...

				chn = allocation_get_ch_info_by_peer_addr(a, &peer_addr);
				if(chn) {
					*err_code = 400;
					*reason = (const u08bits *)"You cannot use the same peer with different channel number";
				} else {
					if(!good_peer_addr(server,ss->realm_options.name,&peer_addr)) {
						*err_code = 403;
						*reason = (const u08bits *) "Forbidden IP";
					} else {
						chn = allocation_get_new_ch_info(a, chnum, &peer_addr);
						if (!chn) {
							*err_code = 500;
							*reason = (const u08bits *) "Cannot find channel data";
						} else {
							tinfo = (turn_permission_info*) (chn->owner);
							if (!tinfo) {
								*err_code = 500;
								*reason
									= (const u08bits *) "Wrong turn permission info";
							}
						}
					}
				}
			}
			
...

	return 0;
}

通过STUN_ATTRIBUTE_XOR_PEER_ADDRESS命令,获取到peer_addr 的地址,紧接着good_peer_addr看看是不是被屏蔽的地址,而后把配对的地址放到map中。

ch_info* allocation_get_new_ch_info(allocation* a, u16bits chnum, ioa_addr* peer_addr)
{

	turn_permission_info* tinfo = get_from_turn_permission_hashtable(&(a->addr_to_perm), peer_addr);

	if (!tinfo)
		tinfo = allocation_add_permission(a, peer_addr);

	ch_info* chn = ch_map_get(&(a->chns), chnum, 1);

	chn->allocated = 1;
	chn->chnum = chnum;
	chn->port = addr_get_port(peer_addr);
	addr_cpy(&(chn->peer_addr), peer_addr);
	chn->owner = tinfo;

	lm_map_put(&(tinfo->chns), (ur_map_key_type) addr_get_port(peer_addr), (ur_map_value_type) chn);

	return chn;
}

lm_map_putpeer_addr地址放到map中去。

在前面打通了 客户到服务器的通道,在通道获取到客户端的数据时,通过read_client_connection获取到客户端的数据,并转发到匹配的端上。

static int read_client_connection(turn_turnserver *server,
				  	  	  	  	  ts_ur_super_session *ss, ioa_net_data *in_buffer,
				  	  	  	  	  int can_resume, int count_usage) {

	...
		if(blen<=orig_blen) {
			ioa_network_buffer_set_size(in_buffer->nbh,blen);
			rc = write_to_peerchannel(ss, chnum, in_buffer);
		}

		if (eve(server->verbose)) {
			TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "%s: wrote to peer %d bytes\n",
					__FUNCTION__, (int) rc);
		}

		FUNCEND;
		return 0;

	} ...
	return -1;
}

write_to_peerchannel负责把数据转发到匹配的通道上,跟进去可以看到。

static int write_to_peerchannel(ts_ur_super_session* ss, u16bits chnum, ioa_net_data *in_buffer) {

	int rc = 0;

	if (ss && (in_buffer->recv_ttl!=0)) {

		allocation* a = get_allocation_ss(ss);

		if (is_allocation_valid(a)) {

			ch_info* chn = allocation_get_ch_info(a, chnum);

			if (!chn)
				return -1;

			/* Channel packets are always sent with DF=0: */
			set_df_on_ioa_socket(get_relay_socket_ss(ss, chn->peer_addr.ss.sa_family), 0);

			ioa_network_buffer_handle nbh = in_buffer->nbh;

			ioa_network_buffer_add_offset_size(in_buffer->nbh, STUN_CHANNEL_HEADER_LENGTH, 0, ioa_network_buffer_get_size(in_buffer->nbh)-STUN_CHANNEL_HEADER_LENGTH);

			ioa_network_buffer_header_init(nbh);

			rc = send_data_from_ioa_socket_nbh(get_relay_socket_ss(ss, chn->peer_addr.ss.sa_family), &(chn->peer_addr), nbh, in_buffer->recv_ttl-1, in_buffer->recv_tos, NULL);
			in_buffer->nbh = NULL;
		}
	}

	return rc;
}

allocation_get_ch_info 是用来获取匹配地址的,之前我们把它放到map下了。

ch_info* allocation_get_ch_info(allocation* a, u16bits chnum) {
	return ch_map_get(&(a->chns), chnum, 0);
}

通过send_data_from_ioa_socket_nbh 把数据发到指定的地址上。

int send_data_from_ioa_socket_nbh(ioa_socket_handle s, ioa_addr* dest_addr,
				ioa_network_buffer_handle nbh,
				int ttl, int tos, int *skip)
{
	...
						if (s->connected && !(s->parent_s)) {
							dest_addr = NULL; /* ignore dest_addr */
						} else if (!dest_addr) {
							dest_addr = &(s->remote_addr);
						}

						ret = udp_send(s,
									dest_addr,
									(s08bits*) ioa_network_buffer_data(nbh),ioa_network_buffer_get_size(nbh));
	...
}

这样通过udp_send 把数据发出去。

到这里,两个重大的 问题,都得到了答案。

连通了以后,服务器不断的报告数据的信息:

59025: session 015000000000000001: usage: realm=<nort.gov>, username=<xxx>, rp=1020, rb=903980, sp=1028, sb=907260
59025: session 012000000000000004: usage: realm=<nort.gov>, username=<xxx>, rp=1027, rb=905188, sp=1021, sb=904052
59027: session 015000000000000001: usage: realm=<nort.gov>, username=<xxx>, rp=1020, rb=898622, sp=1028, sb=906288
59027: session 012000000000000004: usage: realm=<nort.gov>, username=<xxx>, rp=1028, rb=906288, sp=1020, sb=898622
 类似资料: