当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream -在broker中通知和处理错误

诸葛皓
2023-03-14

我对开发带有消息传递的分布式应用程序相当陌生,尤其是对Spring Cloud Stream。我目前想知道如何在代理端处理错误的最佳实践。

消费者方面

对于消费,我们定义了多个类型为 java.util.function.Consumer@Bean。这些配置如下所示:

spring.cloud.stream.bindings.consumeA-in-0.destination=inputA
spring.cloud.stream.bindings.consumeA-in-0.group=$Default
spring.cloud.stream.bindings.consumeB-in-0.destination=inputB
spring.cloud.stream.bindings.consumeB-in-0.group=$Default

这部分工作得很好——当启动应用程序时,交换“输入A”和“输入B”以及具有相应绑定的队列“输入A”。$默认值”和“输入B”。$默认值”在RabbitMQ中自动创建。此外,如果发生错误(例如队列突然不可用),应用程序会立即收到QueuesNotableExc的通知,并不断尝试重新建立连接。

我在这里唯一的问题是:是否有某种方法可以在代码中处理此异常?或者,在代理端处理此类故障的最佳实践是什么?

生产者方面

这个问题更大。生成消息是由一些内部逻辑触发的,我们不能在这里使用函数@Beans。相反,我们目前依靠StreamBridge来发送消息。问题是这种方法不会在启动时触发交换和队列的创建。因此,当我们的代码调用 streamBridge.send(“outputA”, message) 时,消息被发送(结果为 true),但它只是消失在空白中,因为 RabbitMQ 会自动丢弃不可路由的消息。

我发现使用这种配置,我至少可以让RabbitMQ在第一条消息发出后立即创建交换和队列:

spring.cloud.stream.source=produceA;produceB
spring.cloud.stream.default.producer.requiredGroups=$Default
spring.cloud.stream.bindings.produceA-out-0.destination=outputA
spring.cloud.stream.bindings.produceB-out-0.destination=outputB

我需要在代码中使用< code > stream bridge . send(" produceA-out-0 ",message)来使它工作,这并不太好,因为这意味着硬编码显式配置,但至少它工作了。< br >我还尝试按照本回答中描述的反应器风格实现生成器,但是在这种情况下,应用程序启动时也不会创建交换/队列,即使发送方法的返回状态为“OK ”,发送的消息也会消失。

使用这种方法根本不会注册代理端的故障-当我模拟一个失败时,例如通过删除队列或交换,它不会被应用程序注册。只有当发送另一条消息时,我才会进入日志:

ERROR 21804 --- [ 127.0.0.1:32404]o. s. a. r. c. CachingConntion工厂:关机信号:通道错误;协议方法:#方法

但是,在这种情况下,StreamBridge#送的结果是true。但是我们需要知道发送在这一点上确实失败了(我们使用这个布尔返回值持久化发送对象的状态)。有什么方法可以做到这一点吗?

关于如何使这个生产者场景更加健壮,还有其他建议吗?最佳实践?

编辑

我使用相关性找到了一个有趣的生产者问题解决方案:

...
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
messageHeaderAccessor.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, correlation);
Message<String> message = MessageBuilder.createMessage(payload, messageHeaderAccessor.getMessageHeaders());

boolean sent = streamBridge.send(channel, message);

try {
    final CorrelationData.Confirm confirm = correlation.getFuture().get(30, TimeUnit.SECONDS);
    if (correlation.getReturned() == null && confirm.isAck()) {
        // success logic
    } else {
        // failed logic
    }

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    // failed logic
} catch (ExecutionException | TimeoutException e) {
    // failed logic
}

使用以下附加配置:

spring.cloud.stream.rabbit.default.producer.useConfirmHeader=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

这似乎工作得很好,尽管我仍然对StreamBridge#送的返回值一无所知,但它始终是真的,我找不到在哪些情况下它是假的信息。但是其余的都很好,我可以从相关性或确认中获得有关交换或队列问题的信息。

但此解决方案非常关注RabbitMQ,这会导致两个问题:

  • 我们的应用程序应该能够连接到不同的代理(例如 Azure 服务总线)
  • 在测试中,我们使用 Kafka binder,我不知道如何配置应用程序上下文以使其在这种情况下也能工作

任何帮助将不胜感激。


共有1个答案

孙佑运
2023-03-14
匿名用户

在使用者端,您可以侦听诸如< code > ListenerContainerConsumerFailedEvent 之类的事件。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#consumer-events

在生产者方面,生产者只知道交换,而不知道绑定到它们的任何队列;因此< code>requiredGroups属性导致队列被绑定。

您只需要spring.cloud.stream.default.producer.requiredGroups=$Default - 您可以使用StreamBridge发送到任意目的地,然后创建基础架构。

java prettyprint-override">@SpringBootApplication
public class So70769305Application {

    public static void main(String[] args) {
        SpringApplication.run(So70769305Application.class, args);
    }

    @Bean
    ApplicationRunner runner(StreamBridge bridge) {
        return args -> bridge.send("foo", "test");
    }

}
spring.cloud.stream.default.producer.requiredGroups=$Default

 类似资料:
  • 我正在使用创建简单的STOMP代理。当方法中出现时,我希望收到STOMP帧,但默认情况下它不是这样工作的。 另一方面,在中有一种发送帧的机制,它的代码引用了一些与简单代理相关的类:https://github.com/spring-projects/spring-framework/blob/master/spring-websocket/messaging/springframework/web

  • 我有这样基于方面的日志记录: 当我使用RequestBody执行请求时,它工作正常,会触发advice。但是,当我执行没有

  • 我在azurenotification之后添加了azure通知 但是当我加上 如果有错误,请运行应用程序

  • 所以我可以安排这样的通知; 我已在中请求权限,并且通知在我使用通知扩展的自定义视图中显示良好。 我已经在中为通知类别添加了通知操作;这些也出现。 我在通知扩展< code >中设置了相同的类别。plist文件。在通知扩展中,当用户点击一个动作时,我用下面的代码来改变文本。 但是,文本没有改变,也没有调用任何语句; 在应用程序中,我有以下内容; 这两个函数实际上也不会在 中调用。我不确定更新扩展视图

  • iOS消息推送在工作灯,接收通知时,应用程序在后台作为徽章,但当我点击徽章也我得到有效载荷作为警报。我只需要徽章时,应用程序在后台运行。 实际上,我需要在应用程序位于前台时显示警报,如果应用程序位于后台,则显示徽章。在我的push received handler函数中,我正在检查这一点。但在ios中,只有当用户点击badge时,处理器函数get才会触发,而在Android中,当push收到时,处