当前位置: 首页 > 知识库问答 >
问题:

spring集成amqp拒绝消息(如果未处理)

盖和泰
2023-03-14

我需要解决这个场景。我有两个amqp消费者设置来获取一条消息。

@Bean
public IntegrationFlow jmsPrimaryFlow() {
    return IntegrationFlows.from(
        Amqp.inboundGateway(
            taskManager().getPrimaryMessageListenerContainer()).errorChannel(errorChannel())
        )
        .channel(taskChannel())
        .get();
}

@Bean
public IntegrationFlow jmsSecondaryFlow() {
    return IntegrationFlows.from(
        Amqp.inboundGateway(
            taskManager().getSecondaryMessageListenerContainer()).errorChannel(errorChannel())
            .autoStartup(false)
        )
        .channel(taskChannel())
        .get();
}

taskChannel是queuechannel,但一次只允许使用一条消息,因此没有并行处理。如果另一条消息花了太长时间才继续,我如何在超时后拒绝一条消息。那么这个消息将返回到队列,由另一个节点继续?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条,所以如果第一条预取消息需要很长时间才能处理,那么如何释放第二条预取消息呢。

共有1个答案

鲁鹤轩
2023-03-14

你的问题不清楚。您可以在队列通道上设置容量限制(例如1),并在网关上设置发送超时。然后,如果队列已满,则在超时后添加消息的尝试将失败。然而,在这种情况下使用队列通道是危险的——如果服务器出现故障,您可能会丢失消息,因为消息一经放入队列就会被确认。

如果您使用RendezvousChannel替代,生产者将阻止等待消费者接收消息

但请记住,如果服务器在切换后崩溃,即使这一条消息也可能丢失。

 类似资料:
  • 当重新排队到原始队列时,消息可以再次返回到死信队列,并看到x-death报头计数不断增长。 由于某些原因,我们希望处理count>=5的死信消息(例如),并将其他消息重新排入死信队列。 我需要首先对消息进行基本的ack以检查X死亡计数头,然后将其发送到原始队列,如果计数足够大,否则在死信队列中重新排队。 我无法重新排队到死信队列,因为基本的get不在侦听器内部:抛出AmqpRejectAndDon

  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 我有一个入站RabbitMQ通道适配器,每天成功处理3000条消息,但是偶尔我会在RabbitMQ管理控制台看到1条未包装的消息。这似乎仍然是这样。 我确实有一个重试建议链,可以重试3次,然后通过死信路由密钥移动到DLQ,这对大多数例外情况都很有效。 在过去的几周里,unacked已经发生了两次,有一次我能够进行线程转储,并看到int-http:outbound-gateway调用在等待http响

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 我正在开发一个Spring应用程序,它每分钟将接收大约500条xml消息。下面的xml配置只允许每分钟处理大约60条消息,其余消息存储在队列中(持久化在DB中),并以每分钟60条消息的速率检索。 尝试从多个来源阅读文档,但仍然不清楚轮询器和任务执行器的角色。我对当前每分钟处理60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次),“每轮询最大消息数”设置为10,