RabbitMQ Binder
用法
对于使用RabbitMQ绑定器,您只需要使用以下Maven坐标将其添加到您的Spring Cloud Stream应用程序:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您也可以使用Spring Cloud Stream RabbitMQ入门。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
RabbitMQ Binder概述
以下可以看到RabbitMQ活页夹的操作简化图。
图14. RabbitMQ BinderRabbitMQ Binder实现将每个目的地映射到TopicExchange
。对于每个消费者组,Queue
将绑定到该TopicExchange
。每个消费者实例对其组的Queue
具有相应的RabbitMQ Consumer
实例。对于分区生成器/消费者,队列后缀为分区索引,并使用分区索引作为路由密钥。
使用autoBindDlq
选项,您可以选择配置绑定器来创建和配置死信队列(DLQ)(以及死信交换DLX
)。死信队列具有目标名称,附有.dlq
。如果重试启用(maxAttempts > 1
),则会将失败的消息传递到DLQ。如果禁用重试(maxAttempts = 1
),则应将requeueRejected
设置为false
(默认),以使失败的消息将路由到DLQ,而不是重新排队。此外,republishToDlq
导致绑定器向DLQ发布失败的消息(而不是拒绝它); 这使得能够将标题中的附加信息添加到消息中,例如x-exception-stacktrace
头中的堆栈跟踪。此选项不需要重试启用; 一次尝试后,您可以重新发布失败的消息。从版本1.2开始,您可以配置重新发布的消息传递模式; 见财产republishDeliveryMode
。
重要 | 将requeueRejected 设置为true 将导致消息被重新排序并重新发送,这可能不是您想要的,除非故障问题是短暂的。一般来说,最好通过将maxAttempts 设置为大于1,或将republishToDlq 设置为true 来启用binder内的重试。 |
有关这些属性的更多信息,请参阅RabbitMQ Binder Properties。
框架不提供消耗死信消息(或重新路由到主队列)的任何标准机制。Dead-Letter队列处理中描述了一些选项。
注意 | 在Spring Cloud Stream应用程序中使用多个 RabbitMQ绑定器时,禁用“RabbitAutoConfiguration”以避免将RabbitAutoConfiguration应用于两个绑定器的相同配置很重要。 |
配置选项
本节包含特定于RabbitMQ Binder和绑定频道的设置。
有关通用绑定配置选项和属性,请参阅Spring Cloud Stream核心文档。
RabbitMQ Binder Properties
默认情况下,RabbitMQ binder使用Spring Boot的ConnectionFactory
,因此它支持RabbitMQ的所有Spring Boot配置选项。(有关参考,请参阅Spring Boot文档。)RabbitMQ配置选项使用spring.rabbitmq
前缀。
除Spring Boot选项之外,RabbitMQ binder还支持以下属性:
- spring.cloud.stream.rabbit.binder.adminAddresses
RabbitMQ管理插件网址的逗号分隔列表。仅在
nodes
包含多个条目时使用。此列表中的每个条目必须在spring.rabbitmq.addresses
中具有相应的条目。默认值:空。
- spring.cloud.stream.rabbit.binder.nodes
RabbitMQ节点名称的逗号分隔列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目必须在
spring.rabbitmq.addresses
中具有相应的条目。默认值:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
压缩绑定的压缩级别。见
java.util.zip.Deflater
。默认值:
1
(BEST_LEVEL)。
RabbitMQ消费者Properties
以下属性仅适用于Rabbit消费者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
为前缀。
- acknowledgeMode
确认模式。
默认值:
AUTO
。- autoBindDlq
是否自动声明DLQ并将其绑定到绑定器DLX。
默认值:
false
。- bindingRoutingKey
将队列绑定到交换机的路由密钥(如果
bindQueue
为true
)。将附加分区目的地-<instanceIndex>
。默认值:
#
。- bindQueue
是否将队列绑定到目的地交换机?如果您已经设置了自己的基础设施并且先前已经创建/绑定了队列,请设置为
false
。默认值:
true
。- deadLetterQueueName
DLQ的名称
默认值:
prefix+destination.dlq
- deadLetterExchange
分配给队列的DLX; 如果autoBindDlq为true
默认值:'prefix + DLX'
- deadLetterRoutingKey
一个死信路由密钥分配给队列; 如果autoBindDlq为true
默认值:
destination
- declareExchange
是否为目的地申报交换。
默认值:
true
。- delayedExchange
是否将交换声明为
Delayed Message Exchange
- 需要在代理上延迟的消息交换插件。x-delayed-type
参数设置为exchangeType
。默认值:
false
。- dlqDeadLetterExchange
如果DLQ被声明,则将DLX分配给该队列
默认值:
none
- dlqDeadLetterRoutingKey
如果DLQ被声明,则会将一个死信路由密钥分配给该队列; 默认无
默认值:
none
- dlqExpires
未使用的死信队列被删除多久(ms)
默认值:
no expiration
- dlqMaxLength
死信队列中的最大消息数
默认值:
no limit
- dlqMaxLengthBytes
来自所有消息的死信队列中的最大字节数
默认值:
no limit
- dlqMaxPriority
死信队列中消息的最大优先级(0-255)
默认值:
none
- dlqTtl
声明(ms)时默认适用于死信队列的时间
默认值:
no limit
- durableSubscription
订阅是否应该耐用。仅当
group
也被设置时才有效。默认值:
true
。- exchangeAutoDelete
如果
declareExchange
为真,则交换机是否应该自动删除(删除最后一个队列后删除)。默认值:
true
。- exchangeDurable
如果
declareExchange
为真,则交换应该是否持久(经纪人重新启动)。默认值:
true
。- exchangeType
交换类型; 非分区目的地的
direct
,fanout
或topic
direct
或topic
分区目的地。默认值:
topic
。- 到期
未使用的队列被删除多久(ms)
默认值:
no expiration
- headerPatterns
要从入站邮件映射的头文件。
默认值:
['*']
(所有标题)。- maxConcurrency
最大消费者人数
默认值:
1
。- 最长长度
队列中最大消息数
默认值:
no limit
- maxLengthBytes
来自所有消息的队列中最大字节数
默认:
no limit
- maxPriority
队列中消息的最大优先级(0-255)
- 默认
none
- 预取
预取计数。
默认值:
1
。- 字首
要添加到
destination
和队列名称的前缀。默认值:“”。
- recoveryInterval
连接恢复尝试之间的间隔,以毫秒为单位。
默认值:
5000
。- requeueRejected
在重试禁用或重新发布ToDlq是否为false时,是否应重新发送传递失败。
默认值:
false
。- republishDeliveryMode
当
republishToDlq
为true
时,指定重新发布的邮件的传递模式。默认值:
DeliveryMode.PERSISTENT
- republishToDlq
默认情况下,尝试重试后失败的消息将被拒绝。如果配置了死信队列(DLQ),则RabbitMQ将将失败的消息(未更改)路由到DLQ。如果设置为
true
,则绑定器将重新发布具有附加头的DLQ的失败消息,包括最终失败的原因的异常消息和堆栈跟踪。默认值:false
- 交易
是否使用交易渠道。
默认值:
false
。- TTL
声明(ms)时默认适用于队列的时间
默认值:
no limit
- txSize
阿克斯之间的交付次数。
默认值:
1
。
兔子生产者Properties
以下属性仅适用于Rabbit生产者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.
为前缀。
- autoBindDlq
是否自动声明DLQ并将其绑定到绑定器DLX。
默认值:
false
。- batchingEnabled
是否启用生产者的消息批处理。
默认值:
false
。- BATCHSIZE
批量启动时要缓冲的消息数。
默认值:
100
。- batchBufferLimit
默认值:
10000
。- batchTimeout
默认值:
5000
。- bindingRoutingKey
将队列绑定到交换机的路由密钥(如果
bindQueue
为true
)。仅适用于非分区目的地。仅适用于requiredGroups
,然后仅提供给这些组。默认值:
#
。- bindQueue
是否将队列绑定到目的地交换机?如果您已经设置了自己的基础架构并且先前已经创建/绑定了队列,请设置为
false
。仅适用于requiredGroups
,然后仅提供给这些组。默认值:
true
。- 压缩
发送时是否应压缩数据。
默认值:
false
。- deadLetterQueueName
DLQ的名称仅适用于
requiredGroups
,仅适用于这些组。默认值:
prefix+destination.dlq
- deadLetterExchange
分配给队列的DLX; 如果autoBindDlq为true只适用于
requiredGroups
,然后只提供给这些组。默认值:'prefix + DLX'
- deadLetterRoutingKey
一个死信路由密钥分配给队列; 如果autoBindDlq为true只适用于
requiredGroups
,然后只提供给这些组。默认值:
destination
- declareExchange
是否为目的地申报交换。
默认值:
true
。- 延迟
评估应用于消息(
x-delay
头)的延迟的Spel表达式 - 如果交换不是延迟的消息交换,则不起作用。默认值:No
x-delay
头设置。- delayedExchange
是否将交换声明为
Delayed Message Exchange
- 需要经纪人上的延迟消息交换插件。x-delayed-type
参数设置为exchangeType
。默认值:
false
。- deliveryMode
交货方式。
默认值:
PERSISTENT
。- dlqDeadLetterExchange
如果DLQ被声明,则分配给该队列的DLX只适用于
requiredGroups
,然后仅提供给这些组。默认值:
none
- dlqDeadLetterRoutingKey
如果DLQ被声明,则会将一个死信路由密钥分配给该队列; 默认值none仅在提供
requiredGroups
时才适用,然后仅适用于这些组。默认值:
none
- dlqExpires
未使用的死信队列被删除之前多久(ms)仅适用于
requiredGroups
,然后仅提供给这些组。默认值:
no expiration
- dlqMaxLength
死信队列中的最大消息数仅适用于
requiredGroups
,仅适用于这些组。默认值:
no limit
- dlqMaxLengthBytes
来自所有消息的死信队列中的最大字节数仅适用于
requiredGroups
,然后仅提供给这些组。默认值:
no limit
- dlqMaxPriority
死信队列中消息的最大优先级(0-255)仅适用于
requiredGroups
,然后仅提供给这些组。默认值:
none
- dlqTtl
声明(ms)的默认时间适用于死信队列仅适用于
requiredGroups
,然后仅提供给这些组。默认值:
no limit
- exchangeAutoDelete
如果
declareExchange
为真,则交换机是否应该自动删除(删除最后一个队列后删除)。默认值:
true
。- exchangeDurable
如果
declareExchange
为真,则交换应该是持久的(经纪人重新启动)。默认值:
true
。- exchangeType
交换类型;
direct
,fanout
或topic
;direct
或topic
。默认值:
topic
。- 到期
在未使用的队列被删除之前多久(ms)仅适用于
requiredGroups
,然后只提供给这些组。默认值:
no expiration
- headerPatterns
要将标头映射到出站邮件的模式。
默认值:
['*']
(所有标题)。- 最长长度
队列中最大消息数仅适用于
requiredGroups
,仅适用于这些组。默认值:
no limit
- maxLengthBytes
来自所有消息的队列中最大字节数仅适用于
requiredGroups
,仅适用于这些组。默认值:
no limit
- maxPriority
队列中消息的最大优先级(0-255)仅适用于
requiredGroups
,仅适用于这些组。- 默认
none
- 字首
要添加到
destination
交换机名称的前缀。默认值:“”。
- routingKeyExpression
一个SpEL表达式来确定在发布消息时使用的路由密钥。
默认值:
destination
或destination-<partition>
分区目的地。- 交易
是否使用交易渠道。
默认值:
false
。- TTL
声明时默认适用于队列的时间(ms)仅适用于
requiredGroups
,然后仅适用于这些组。默认值:
no limit
注意 | 在RabbitMQ的情况下,内容类型头可以由外部应用程序设置。Spring Cloud Stream支持它们作为用于任何类型传输(包括通常不支持头文件的Kafka)的传输的扩展内部协议的一部分)。 |
重试RabbitMQ Binder
概观
在绑定器中启用重试时,侦听器容器线程将被挂起,以配置任何后退时段。在单个消费者需要严格排序时,这可能很重要,但是对于其他用例,它可以防止在该线程上处理其他消息。使用绑定器重试的另一种方法是设置死机字符随着时间生活在死信队列(DLQ)上,以及DLQ本身的死信配置。有关这里讨论的属性的更多信息,请参阅RabbitMQ Binder Properties。启用此功能的示例配置:
- 将
autoBindDlq
设置为true
- 绑定器将创建一个DLQ; 您可以选择在deadLetterQueueName
中指定一个名称 - 将
dlqTtl
设置为您要在重新投递之间等待的退出时间 - 将
dlqDeadLetterExchange
设置为默认交换 - DLQ的过期消息将被路由到原始队列,因为默认deadLetterRoutingKey
是队列名称(destination.group
)
要强制一个消息被填字,抛出一个AmqpRejectAndDontRequeueException
,或设置requeueRejected
到true
并抛出任何异常。
循环将继续没有结束,这对于短暂的问题是很好的,但是您可能想在一些尝试后放弃。幸运的是,RabbitMQ提供了x-death
标题,允许您确定发生了多少个周期。
在放弃之后确认一则消息,抛出一个ImmediateAcknowledgeAmqpException
。
把它放在一起
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置创建一个与通配符路由密钥#
交换主题的队列myDestination.consumerGroup
的交换myDestination
。它创建一个绑定到具有路由密钥myDestination.consumerGroup
的直接交换DLX
的DLQ。当消息被拒绝时,它们被路由到DLQ。5秒钟后,消息过期,并使用队列名称作为路由密钥路由到原始队列。
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
请注意,x-death
标题中的count属性是Long
。
Dead-Letter队列处理
因为不可能预料到用户如何处理死信消息,所以框架不提供任何标准的机制来处理它们。如果死刑的原因是暂时的,您可能希望将邮件路由到原始队列。但是,如果问题是一个永久性的问题,那可能会导致无限循环。以下spring-boot
应用程序是如何将这些消息路由到原始队列的示例,但是在三次尝试之后将其移动到第三个“停车场”队列。第二个例子使用RabbitMQ延迟消息交换来向被重新排序的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener
从DLQ接收消息,您也可以在批处理过程中使用RabbitTemplate.receive()
。
这些示例假定原始目的地是so8400in
,消费者组是so8400
。
非分区目的地
前两个示例是目的地未分区。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
分区目的地
对于分区目的地,所有分区都有一个DLQ,我们从头部确定原始队列。
republishToDlq = FALSE
当republishToDlq
为false
时,RabbitMQ将消息发布到DLX / DLQ,其中包含有关原始目的地信息的x-death
标题。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
republishToDlq =真
当republishToDlq
为true
时,重新发布恢复器将原始交换和路由密钥添加到标题。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}