当前位置: 首页 > 知识库问答 >
问题:

Spring cloud stream rabbit:当rabbit服务器不可用时创建队列

弓明亮
2023-03-14

我正在使用
spring-cloud d-stream: 3.1.4
spring-cloud d-stream-binder-兔子:3.1.4

我在“属性”下的此处配置了一个消费者。我的问题是,当使用者在rabbitmq服务器可用之前启动时,我可以看到使用者会重新启动,直到连接可用为止。然而,DLX和DLQ之间创建的绑定并不相同。

  • 如果消费者启动时Rabbitmq可用:DLQ绑定到DLX,路由密钥为worker.request.queue.name和worker.request.dlq.name
  • 如果消费者在重试后启动时Rabbitmq不可用:DLQ仅使用例程Key'worker.request.dlq.name'绑定到DLX。

问题是我需要两者都有约束力。任何人都可以帮助我理解我做错了什么?

谢谢

# 1. consumer queue configuration to listen for worker requests
# Exchange name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.destination=${worker.request.exchange.name}
# Queue name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.group=${worker.request.queue.name}
# Force creation of queue if doesn't exists.
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.requiredGroups=${worker.request.queue.name}
# Disable retry of error messages
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.maxAttempts=1

# 2. producer : to send worker responses to manager
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.destination=${worker.response.exchange.name}
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.group=${worker.response.queue.name}
# Do not create associated queue. Queue is created by the manager which subscribe to the exchange
# spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.producer.requiredGroups=${worker.response.queue.name}

###############################
# Rabbit binder configuration #
###############################

# 1. consumer queue configuration to listen for worker requests
# Allows naming created queues with only group property. Default is destination.group.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.queueNameGroupOnly=true
# enable transaction of consumed messages. NOTE : the index must not be present in the binding name !!!!!
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in.consumer.transacted=true
# Queue and Exchange for request can be created by manager.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.maxPriority=255
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterQueueName=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchange=${worker.request.dlx.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterRoutingKey=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchangeType=topic
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.dlqMaxPriority=255

编辑1:

查看sprint cloud stream rabbit投标人代码后,我可以在以下位置看到问题:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L659

每个队列/Exhcnage/绑定声明初始化一次,并在每次尝试后重试。但是声明列表是一个带有字符串键的bean映射。如果在同一队列上进行双重绑定,则密钥相同,因此只保存第一个声明。请参见此处:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L629

共有1个答案

皇甫学海
2023-03-14

这是一只虫子;请在此处打开问题https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues

 类似资料:
  • 我将Spring Boot应用程序与sping-Rabbit(版本2.2.2)一起使用。由于我的应用程序的本质是非常动态的,因此队列和绑定是使用方法动态声明的,因此它们不会声明为Spring Beans。 根据我的理解(和测试),的自动恢复拓扑的功能在Rabbitmq服务器重启时仅适用于声明为Spring Beans的交换/队列/绑定(我说得对吗?)。 我尝试使用以下方法使用底层Rabbit客户端

  • 创建 http 服务器 package main   import ( "net/http"   "github.com/hprose/hprose-golang/rpc" )   func hello(name string) string { return "Hello " + name + "!" }   func main() { service := rp

  • 我想创建一个Apache服务器,该服务器可以通过其IP地址访问,以托管应用程序的资源,并且我使用的是XAMPPV3。2.2这样做。当我通过可通过控制面板访问的服务器本地IP访问从连接到同一网络(或本地网络,即服务器)的任何计算机托管的网页时,服务器主机正常- 编辑: 以下是我的XAMPP Apache的配置: httpd.conf:

  • 然后我尝试启动微服务,但我得到以下错误:

  • 我已经在我的Windows7机器上安装了WebLogic12C(12.1.3)。在创建管理服务器并登录之后,我试图创建一个新的托管服务器。但是当我按下create按钮时,显示加载符号需要花费大量时间,最后给出等待完成的错误超时:Activate state:STATE_DISTRIBUTED Target Servers states:AdminServer STATE_DISTRIBUTED。早

  • 我在我们的应用程序中使用eclipse milo java OPCUA客户端SDK来实现OPC。我能够创建会话、订阅和监视。它工作正常,但当opc服务器重新启动时,我尝试删除上一个会话并创建一个具有相同参数的新会话,我得到以下异常: UaException:状态=错误超时,消息=等待确认超时 抛出错误的行是: EndpointDescription[]endpoint=客户端。getEndpoin