转自:http://www.taobaotest.com/blogs/2284
Trunner节点之间通过JGroups消息进行通信,JGroups最大的一个优点就是内在支持集群特性,因此Trunner可以非常方便的实现负载均衡、可扩展性、高可用性。下面我们通过分析一下Trunner使用到的JGroups协议栈配置,来看看这些集群特性是怎么实现的。
UDP协议是JGroups底层的消息传送协议,注意这里的UDP和网络协议中的UDP概念不一样,除了说明底层数据传输使用的是UDP网络传输层协议之外还可以指定JGroups对消息处理的线程模型。这一层定义了两个线程池:thread_pool和oob_thread_pool,前者用来处理需要按序处理的消息,后者处理无序消息。UDP只是JGroups协议栈中传输的一个可选协议,用户也可以使用TCP来实现相同的功能。
PING协议被JGroups使用来发现集群节点,在JGroups集群中没有中心配置服务器和主控节点,所有的节点都是对等的,那么集群中的成员信息是怎么维护的呢?当一个节点启动时,它通过PING协议向一个组播地址发送PING消息,在这个组中的节点都会返回当前的集群节点信息给这个发起者。节点信息中包含一个重要信息-Coordinator(集群协调者),加入者得到这个信息之后会发送JOIN消息到Coordinator,Coordinator收到JOIN消息之后会更新集群地址列表然后发送VIEW消息到集群所有成员来更新集群成员信息。PING包含一个超时时间,如果在超时时间内没收到PING返回,加入者自己成为这个集群的Coordinator。如果由于任何原因Coordinator离开集群,则节点按照加入时间顺序自动升级成Coordinator。所有这些操作都是在PING协议层完成的,所以对于应用来说完全不必关心节点地址维护。这就使得Trunner集群能够做零配置扩容,在集群中增加一个节点就是在一台新机器上使用trunner.sh start controller|agent这个命令这么简单。
上面的PING协议有个问题,如果由于网络原因(临时中断)集群中出现多个Coordinator怎么办?PING之上的MERGE2协议就可以解决这个问题。MERGE2做到在网络恢复集群探测到有多个Coordinator组成多个子集群的时候发送MERGE消息给所有节点,将某些Coordinator降级使得集群重新整合成一个。
有加入就有退出,如果上层应用调用JGroups的channel disconnect方法正常退出,退出节点会发送LEAVE消息来使得集群的VIEW得到更新。但是很多时候,集群中的节点由于系统崩溃、物力宕机等原因非正常退出,这种情况也必须考虑到。JGroups底层通过FD、FD_SOCK、FD_ALL等协议来支持错误检测(Failure Detection)。Trunner集群中通过结合FD_ALL和FD_SOCK协议来保证错误检测的完备和及时。FD的工作方式是集群中的节点组成有序环,每个节点向它的下游发送心跳消息,如果心跳消息返回超时,这个节点就会向集群中所有成员发送SUSPECT消息SUSPECT对象就是那个没有及时作心跳返回的节点。SUSPECT消息只有Coordinator会处理,Coordinator收到SUSPECT消息后会向被怀疑对象发送VERIFY_SUSPECT消息,如果确认挂掉则将该节点从集群VIEW中去除并且将VIEW发送到集群每个节点。有了心跳检测还不是很保险,比如:被怀疑对象很忙,没有及时返回心跳消息和怀疑探测消息。Trunner中因此加入了FD_SOCK检测,它的工作方式是所有的节点组成一个Socket环,每个节点和他的下游节点建立Socket连接,这样只有当Socket连接非正常中断的时候,怀疑才会发生。FD_ALL和FD类似,也是采用心跳机制,但是通过消息组播的方式来发送心跳消息。
有了JGroups在协议栈上对集群特性的支持,Trunner能够非常方便的实现可扩展,高可用。同时每个节点定时汇报自己的资源利用率,Trunner Controller节点根据实时的节点负载情况动态分配性能测试任务,非常有效和简单的实现了压测任务的负载均衡。通过配置JGroups的STATE_TRANSFER协议层,Trunner节点的任务信息和节点资源信息被同步到新加入的节点中,进一步支持了零配置的动态扩容。
协议栈是JGroups最强大,最灵活的技术,JGroups本身提供了50多种协议供用户配置使用,用户也完全可以根据自己的需要对这个协议栈进行扩展。常用的比如:传输协议(UDP、TCP),消息分块协议(FRAG、FRAG2),可靠传输协议(UNICAST、NAKACK),失败侦测(FD、FD_SOCK、FD_ALL),排序协议(SEQUENCER)、成员协议(GMS)、加密协议(ENCRYPT)等等。这里列举一些,希望了解更多详细信息的可以参考JGroups官方文档。
附:Trunner的JGroups协议栈配置
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
<!-- 使用UDP组播作为消息传输协议 -->
<UDP
mcast_port="${jgroups.udp.mcast_port:45588}"
tos="8"
ucast_recv_buf_size="20M"
ucast_send_buf_size="640K"
mcast_recv_buf_size="25M"
mcast_send_buf_size="640K"
loopback="true"
discard_incompatible_packets="true"
max_bundle_size="64K"
max_bundle_timeout="30"
ip_ttl="${jgroups.udp.ip_ttl:8}"
enable_bundling="true"
enable_diagnostics="true"
thread_naming_pattern="cl"
timer_type="new"
timer.min_threads="4"
timer.max_threads="10"
timer.keep_alive_time="3000"
timer.queue_max_size="500"
thread_pool.enabled="true"
thread_pool.min_threads="2"
thread_pool.max_threads="8"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="true"
thread_pool.queue_max_size="10000"
thread_pool.rejection_policy="discard"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>
<!-- IP Multicasting Membership Discovering -->
<PING timeout="2000"
num_initial_members="3"/>
<!-- 集群分裂自动融合 -->
<MERGE2 max_interval="30000"
min_interval="10000"/>
<!-- 基于Socket有序环的错误侦测 -->
<FD_SOCK/>
<!-- 基于心跳的错误侦测 -->
<FD_ALL/>
<VERIFY_SUSPECT timeout="1500" />
<!-- 组播消息可靠性保证(重发)和FIFO消息序列保证 -->
<pbcast.NAKACK exponential_backoff="300"
xmit_stagger_timeout="200"
use_mcast_xmit="false"
discard_delivered_msgs="true"/>
<!-- 单播消息可靠性保证(重发)和FIFO消息序列保证 -->
<UNICAST />
<!-- 未消费消息持久化 -->
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="4M"/>
<!-- 成员管理 -->
<pbcast.GMS print_local_addr="true" join_timeout="3000"
view_bundling="true"/>
<!-- 单播消息流量控制 -->
<UFC max_credits="2M"
min_threshold="0.4"/>
<!-- 组播消息流量控制 -->
<MFC max_credits="2M"
min_threshold="0.4"/>
<!-- 大消息分块 -->
<FRAG2 frag_size="60K" />
<!-- 节点状态迁移 -->
<pbcast.STATE_TRANSFER />
<!-- 集群锁支持 -->
<CENTRAL_LOCK />
<!-- 集群计数器支持 -->
<COUNTER bypass_bundling="true" timeout="5000"/>
</ config >