1.9.3 Topic Queue

优质
小牛编辑
123浏览
2023-12-01

TopicQueue是为了保证Queue中Message的局部有序性(同一Topic内有序)而引入的概念。

用户在创建Queue时,将Queue的topicQueue属性设置为True,得到的Queue即是TopicQueue。
之后,向TopicQueue中发送Message时,用户可以为Message设置String类型的Topic属性(也可以不指定,即Topic为空)。 EMQ将保证用户接收Message时,相同Topic的Message严格按照MessageId的顺序送达; 并且,如果用户未对MessageId较小的Message进行deleteMessage(), 同一Topic下MessageId较大Message不会被任何一个Client接收到.


MessageId的大小依据sendTime+delayTime.
空Topic和任何Topic都不相同,即Topic为空的Message没有“有序性”,其行为与nonTopic中的Message一样.

TopicQueue中的Message同样具有“超时重发”的特性,以保证消息队列at-lease once的送达语义; 重发现象出现后,可能会导致客户端的某两个并发(线程)收到同样的消息,从而打乱消息的顺序。详细举例如下:

相同Topic的Message,依messageId的大小为m1, m2, m3
用户启动了两个线程来处理Message, t1, t2;超时设置为30秒
t1收到m1,进行处理;处理过程中,m2, m3被EMQ锁定,无法被任何线程接收;
t1很快处理完m1,并使用`deleteMessage()`进行ACK。ACK后,Message解锁,m2被t2接收;m3被锁定
t2收到m2后,尚未开始处理,即被操作系统挂起(如Java GC)。挂起时间超过30秒
超时后,m2再次变为visible,从而被t1接收处理。t1对m2 deleteMessage()后,m3被解锁,从而被t1接收并处理
t2挂起结束后,并未感知m2已被t1处理,于是继续处理m2
在上述过程时,由于“超时重发”的特性,从EMQ Server看来,Message的receive顺序是m1, m2, m2, m3
但从用户整体的处理来看,处理顺序为m1, m2, m3, m2
在这种情况下,Message的有序性遭到了破坏。

需要指出,具有auto-recovery功能的消息系统可能都会存在类似的问题。
如果需要严格有序性,需要用户在Client端配合。
比如Client记录处理的最后一条Message的messageId,然后简单丢弃receive到的messageId更大的Message。 这样,不但保证了Message的严格有序,同时也实现了at-most once的语义,即保证了Message处理不丢不重。

TopicQueue在实现方法上,是通过HASH算法将相同Topic的Message放入Queue中的同一Partition,因此可能会有如下的一些性能损失。如果一个队列中的Message没有有序性的需求,请尽量将其创建为nonTopicQueue:

  1. Partition的数量不可调整
    对于nonTopicQueue,用户可以在流量增大时增加Partition数量,以提高Queue的并发性; 但在目前的实现中(以后可能会改进),TopicQueue的Partition数量一经确认,不可再次调整。 用户需要在createQueue()时预估流量,设置合适的Partition数量。对于size在1 KB左右的Message,可以认为一个Partition的通过能力在500 Message/sec.
  2. 不同Topic Message之间的相互影响
    一个Partition中包含一到多个Topic;
    如果某个Topic的Message长时间一直未被成功处理,可能会影响同一Partition内所有Message.
  3. Message在Partition之间分布不均匀
    指定了Topic的Message只能被send至Queue中某一确定的Partition,如果此Partition对应的底层结构暂时不可用,将导致sendMessage()失败;
    未指定Topic的Message(包括nonTopicQueue的Message和TopicQueue中未指定Topic的Message)将随机地进入某个Partition,并在此Partition不可用时,选择另外的Partition重试.