当前位置: 首页 > 知识库问答 >
问题:

如何使用具有多种消息类型的中断器

彭宏义
2023-03-14

我的系统有两种不同类型的消息-类型A和B。每条消息都有不同的结构-类型A包含一个int成员,类型B包含一个double成员。我的系统需要将这两种类型的消息传递给许多业务逻辑线程。减少延迟非常重要,因此我正在研究使用中断器以机械方式将消息从主线程传递到业务逻辑线程。

我的问题是,破坏者只接受环形缓冲区中的一种类型的对象。这是有意义的,因为破坏者预先分配了环形缓冲区中的对象。然而,这也使得通过Disruptor将两种不同类型的消息传递给我的业务逻辑线程变得困难。据我所知,我有四种选择:

>

  • 配置disruptor以使用包含固定大小字节数组的对象(如如何使用disruptor(disruptor模式)来构建真实世界的消息系统所建议的那样)。在这种情况下,主线程必须在将消息发布到disruptor之前将其编码为字节数组,并且每个业务逻辑线程必须在收到消息后将字节数组解码回对象。这种设置的缺点是业务逻辑线程没有真正共享来自中断器的内存,而是从中断器提供的字节数组创建新对象(从而创建垃圾)。这种设置的好处是,所有业务逻辑线程都可以从同一个中断器读取多种不同类型的消息。

    将中断器配置为使用单一类型的对象,但创建多个中断器,每个对象类型一个。在上述情况下,将有两个独立的中断器-一个用于类型A的对象,另一个用于类型B的对象。这种设置的好处是,主线程不必将对象编码到字节数组,无业务逻辑线程可以共享中断器中使用的相同对象(没有创建垃圾)。这种设置的缺点是,每个业务逻辑线程都必须订阅来自多个破坏者的消息。

    将disruptor配置为使用一种类型的“super”对象,该对象包含消息a和消息B的所有字段。这与OO样式非常不一致,但允许在选项1和选项2之间进行折衷。

    配置中断器以使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。

    对于这种情况,你有什么建议?我觉得选项#2是最干净的解决方案,但我不知道消费者是否或如何从技术上订阅来自多个破坏者的消息。如果有人能提供一个如何实施选项2的示例,我们将不胜感激!

  • 共有3个答案

    乜安志
    2023-03-14

    与Vortex的答案太相似,但在保留子事件方面有所不同。它混合了#3和#4。如果我可以管理业务逻辑复杂性,我会去#2多个分配器。

    与基于数组的枚举事件类型实现相比,更喜欢的主要问题是不同事件类型的共享对象类型。

    public enum ExchangeEventType{
        PLACE_ORDER,   // -> OrderEvent
        CANCEL_ORDER,  // -> OrderEvent
        MARKET_FEED,   // -> MarketEvent
        MARKET_UPDATE, // -> MarketEvent
        ADD_USER,      // -> AccountEvent
        SUSPEND_USER,  // -> AccountEvent
        RESUME_USER    // -> AccountEvent
    }    
    
    public ExchangeEvent{
      private EventType type;
      private EventResultCode resultCode;
      private long timestamp;
    
      // event type objects
      private OrderEvent orderEvent;
      private MarketEvent marketEvent;
      private AccountEvent accountEvent;
    }
    

    在业务逻辑中,多个处理器消耗并生成多个类型的事件,因此不使用单独的分发程序是我有意识地选择的一种权衡。

    例如;

    • #1使用OrderEvent的引擎
    颜鸿云
    2023-03-14

    本·鲍姆戈尔德,我相信你现在已经找到了解决方案。你的#4(或#3)可以通过创建一个事件持有者来轻松实现。把它想象成对象的枚举。为了加快查找速度,事件应该用枚举类型来丰富。请注意,我在持有者中存储了对原始事件的引用。创建一个复制构造函数或clone()并在插入环形缓冲区时复制事件可能更合适。

    举例说明:

    //这是事件中使用的枚举

    public enum MyEventEnum {
    EVENT_TIMER,
    EVENT_MARKETDATA;
    }
    

    //这是持有人。在任何时候,ringBuffer中的这个实例都只保存一个由数组索引的事件[type.ordinal () ]. 为什么数组应该从代码中显而易见。

    public class RingBufferEventHolder {    
     private MyEventEnum;   
     private EventBase array[];
    
     public RingBufferEventHolder() {
        array=new EventBase[MyEventEnum.values().length]; 
     }
    
     // TODO: null the rest
     public void setEvent(EventBase event) {
        type=event.getType();
        switch( event.getType() ) {
            case EVENT_TIMER:
                array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
                break;
            case EVENT_MARKETDATA:
                array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
                break;
            default:
                throw new RuntimeException("Unknown event type " + event );
        }
    }
    

    //发布事件

       EventBase newEvent=new EventMarketData(....);
       // prepare
       long nextSequence = ringBuffer.next(); 
       RingBufferEventHolder holder = ringBuffer.get(nextSequence);
       holder.setEvent(newEvent);
       // make the event available to EventProcessors 
       ringBuffer.publish(nextSequence);
    
    孟豪
    2023-03-14

    配置中断器以使用包含固定大小html" target="_blank">字节数组的对象(如如何使用中断器(中断器模式)构建现实世界的消息系统?)。在这种情况下,主线程必须将消息编码为字节数组,然后再将它们发布到中断器,并且每个业务逻辑线程必须在收到后将字节数组解码回对象。这种设置的缺点是业务逻辑线程并没有真正共享来自中断器的内存——相反,它们正在从中断器提供的字节数组创建新对象(从而创建垃圾)。这种设置的好处是所有业务逻辑线程都可以从同一个中断器读取多种不同类型的消息。

    这是我的首选方法,但我对我们的用例有点不太了解,几乎我们使用过的每个地方都有干扰器,它要么从某种I/O设备接收,要么发送到某种I/O设备,所以我们的基本货币是字节数组。通过使用flyweight方法编组,可以绕过对象创建。为了查看这方面的示例,我在Devxx上展示的一个示例中使用了Javolution的Struct和Union类(https://github.com/mikeb01/ticketing). 如果您可以在从事件处理程序的OneEvent调用返回之前完全处理该对象,那么这种方法工作得很好。如果事件需要超出该点,则需要对数据进行某种形式的复制,例如将其反序列化为对象。

    将中断器配置为使用单一类型的对象,但创建多个中断器,每个对象类型一个。在上面的情况下,将有两个单独的中断器——一个用于A类型的对象,另一个用于B类型的对象。此设置的好处是主线程不必将对象编码为字节数组,并且业务较少的逻辑线程可以共享与中断器中使用的相同对象(不创建垃圾)。此设置的缺点是,每个业务逻辑线程将不得不订阅来自多个中断器的消息。

    如果没有尝试这种方法,您可能需要一个可以从多个环形缓冲区轮询的自定义EventProcessor。

    将disruptor配置为使用一种类型的“super”对象,该对象包含消息a和消息B的所有字段。这与OO样式非常不一致,但允许在选项1和选项2之间进行折衷。将中断器配置为使用对象引用。然而,在这种情况下,我失去了对象预分配和内存排序的性能优势。

    我们已经在一些情况下这样做了,在这些情况下,缺乏预分配是可以容忍的。它工作正常。如果您正在传递对象,那么您需要确保在消费者端完成它们后将它们归零。我们发现对“超级”对象使用双重分派模式可以保持实现相当干净。这样做的一个缺点是,与使用直数组对象的情况相比,您将获得稍长的GC停顿,因为GC在标记阶段有更多活动对象要遍历。

    对于这种情况,您有什么建议?我觉得选项#2是最干净的解决方案,但我不知道消费者是否或如何在技术上订阅来自多个中断者的消息。如果有人能提供一个如何实现选项#2的示例,将不胜感激!

    如果您希望在数据使用方面具有完全的灵活性,另一种选择是不使用环形缓冲区,而是直接与Sequencer对话,并定义您认为最合适的对象布局。

     类似资料:
    • 假设我有一个名为的Kafka主题,它有几个消息类型(每个消息类型都有不同的Avro模式),如、等等。我想了解一下用Spring Cloud Stream发布/接收相同主题的不同类型是否可行(而且有意义)。特别是,拥有几个将非常有用,每个专用于特定类型。根据这篇博文,当需要订购消息时,这是非常有用的,因为它们与同一个实体相关。这种情况下的配置示例是什么?

    • 我有多个制作人,可以向一个Kafka主题发送多种类型的事件。 我有一个消费者,它必须消费所有类型的消息。每种类型的消息都有不同的逻辑。 但在这种情况下,所有消息都指向此方法,不仅是EventOne 如果我实现了两种方法(对于每种类型的消息),那么所有消息都只能使用一种方法。 如果我像这样实现监听器: 然后我得到一个例外:org。springframework。Kafka。KafkaListener

    • 问题内容: 我必须使用(相对)标准的围棋程序go.net/websocket库。我正在尝试从网页中接收和解码消息,这些消息对于每种消息类型都具有不同的结构,即 有什么方法可以对消息进行“部分”解码,仅在继续将实际消息解码为go结构之前检查该字段? 这是否有必要编写一个自定义a’la ,以将其委托给消息本身的JSON编解码器? 问题答案: 使用json.RawMessage延迟解码,例如 是一个别名

    • 我有一个建立在Kafka之上的事件源应用程序。目前,我有一个主题中有多个消息类型。所有序列化/反序列化的JSON。 那么这种方法如何与Kafka流媒体应用程序一起工作呢?在该应用程序中,您需要指定一个键和值serde? 我是不是应该忘了Avro而改用protobuff呢?

    • 从使用RecycerView创建动态列表: 当我们创建时,我们必须指定将与适配器绑定的。 是否可以创建具有多种视图类型的?

    • 问题内容: 我必须完成一个奇怪的特殊情况。其描述如下: 我必须设计一个地图,其中“ 键 ”始终为 字符串 类型。但是,其中一个键的“ 值 ”可以是 字符串或列表 (取决于特定键可以具有的值数。如果该特定键只有一个值,并且必须如果键包含许多值,则为列表)。如何完成这种情况? 例如:在映射中有2个键,分别是“名称”和“电话号码”。一个人只能有一个名字和多个电话号码。因此,此处的第一个键(即“名称”)的