当前位置: 首页 > 知识库问答 >
问题:

MessageConsumer.completion处理程序是EventBus.local消费者所必需的吗?

艾雪风
2023-03-14

以下代码可以是自定义请求/响应实现的一部分。我没有使用EventBus。请求,因为我需要更多的自由,比如对一个请求发送多个响应。

String replyAddress = generateUniqueAddress();
replyConsumer = eventBus.localConsumer(replyAddress, this::handleReply);
replyConsumer.completionHandler(result -> {
  if (result.succeeded()) {
    eventBus.send("someAddress", new SomeMessageWhichContainsMyReplyAddress(replyAddress));
  }
  else {
    ...
  }
});

问题是:我真的必须等待完成处理程序,还是可以像这样简化代码?

String replyAddress = generateUniqueAddress();
replyConsumer = eventBus.localConsumer(replyAddress, this::handleReply);
eventBus.send("someAddress", new SomeMessageWhichContainsMyReplyAddress(replyAddress));

MessageConsumer的Javadoc。completionHandler描述了此方法在“注册已在集群中传播”时通知给定的处理程序。因为没有使用EventBus的集群。localConsumer,是否可以假定注册始终立即在本地事件总线上传播?我检查了Vert的实现。x 3.9.3,至少对于这个版本,注册似乎是同步的。

共有1个答案

莫选
2023-03-14

在侦听器注册之后,完成处理程序只被调用一次。如果您想在发生这种情况后向某个地址发送消息,那么您的初始代码是正确的。

由于Vert. x的异步性质,您的第二个示例将不一致。很可能在您发送消息时,您的本地消费者尚未注册,因此消息丢失。

如果您的目的是在收到每个新消息后向someAddress发送新消息,那么这两段代码都是错误的。

 类似资料:
  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 关于破坏者,我有以下问题: 消费者(事件处理器)没有实现他们实现EventHandler的任何可调用或可运行接口,那么他们如何能够并行运行,因此,例如,我有一个disruptor实现,其中有这样一个菱形模式 其中c1到c3可以在p1之后并联工作,C4和C5在p1之后工作。 所以通常我会有这样的东西(P1和C1-C5是可运行/可调用的) 但是在Disruptor的情况下,我的事件处理程序都没有实现R

  • 我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有代码: 处理消费者异常的最佳方法是什么?现在,如果异常引发,它将被吞没。。。

  • 启动使用者接收消息 根据我的理解,consumer直接使用来自broker的消息,但在上面的consumer命令中,我们没有提到broker,而只提到zookeeper。消费者是否会连接到zookeeper(而不是broker)来消费消息?