我有RabbitMQ的工作配置-使用TTL从等待(死信)的主队列中发送消息,并在到期时将其抛出:
@Configuration
@AllArgsConstructor
public class RabbitConfig {
public static final String QUEUE_MESSAGES = "demo-messages-queue";
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
public static final String EXCHANGE_MESSAGES = "demo-messages-exchange";
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
public Queue mainQueue() {
return QueueBuilder
.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", EXCHANGE_MESSAGES)
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder
.durable(QUEUE_MESSAGES_DLQ)
.withArgument("x-dead-letter-exchange", EXCHANGE_MESSAGES)
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES)
.withArgument("x-message-ttl", 30000)
.build();
}
@Bean
public Binding mainBinding() {
return BindingBuilder
.bind(mainQueue())
.to(exchange())
.with(QUEUE_MESSAGES);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(exchange())
.with(QUEUE_MESSAGES_DLQ);
}
}
对于应用程序中的Spring云流,我不能重复相同的方案。yml公司
spring:
cloud:
stream:
binders:
rabbitAll:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: test
username: guest
password: guest
message-ttl: 90000
rabbit:
bindings:
demo-push-output:
exchange-type: direct
destination: demo-message-queue
producer:
binding-routing-key: 'demo-message-queue'
routing-key-expression: '''demo-message-queue'''
demo-push-input:
consumer:
bindingRoutingKey: demo-message-queue
routingKeyExpression: '''demo-message-queue'''
deadLetterRoutingKey: demo-message-queue.dlq
deadLetterQueueName: demo-message-queue.dlq
deadLetterExchange: demo-message-exchange
dlqDeadLetterExchange: demo-message-exchange
dlqDeadLetterRoutingKey: demo-message-queue
autoBindDlq: true
dlqTtl: 10000
bindings:
demo-push-input:
destination: demo-message-queue
group: demo-message-queueGroup
binder: rabbitAll
demo-push-output:
destination: demo-message-queue
group: demo-message-queueGroup
binder: rabbitAll
我将非常感谢您在应用程序中提供的帮助和工作示例。yml公司
工作配置Spring-Cloud-Stream:
spring:
cloud:
stream:
binders:
rabbitAll:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: test
username: guest
password: guest
message-ttl: 90000
rabbit:
bindings:
demo-push-output:
exchangeType: direct
destination: demo-message-queue
producer:
bindingRoutingKey: 'demo-message-queue'
routingKeyExpression: '''demo-message-queue'''
demo-push-input:
consumer:
bindingRoutingKey: demo-message-queue
routingKeyExpression: '''demo-message-queue'''
deadLetterRoutingKey: demo-message-queue.dlq
deadLetterQueueName: demo-message-queue.dlq
deadLetterExchange: demo-message-queue
dlqDeadLetterExchange: demo-message-exchange
dlqDeadLetterRoutingKey: demo-message-queue
autoBindDlq: true
dlqTtl: 30000
bindings:
demo-push-input:
destination: demo-message-exchange
group: demo-message-exchangeGroup
binder: rabbitAll
consumer:
max-attempts: 1
demo-push-output:
destination: demo-message-exchange
group: demo-message-exchangeGroup
binder: rabbitAll
界面:
public interface DemoPushEventsSource {
String PUSH_INPUT = "demo-push-input";
String PUSH_OUTPUT = "demo-push-output";
@Input(PUSH_INPUT)
SubscribableChannel demoPushInput();
@Output(PUSH_OUTPUT)
MessageChannel demoPushOutput();
}
绑定:
@EnableBinding({DemoPushEventsSource.class})
public class StreamBindingConfiguration {
}
生产商:
public void sendMessage(User user) {
source.demoPushOutput().send(buildMessageWithHeaders(user, headers));
}
public Message buildMessageWithHeaders(Object request, Map<String, Integer> headers) {
return MessageBuilder.withPayload(request).copyHeaders(headers).build();
}
消费者:
@StreamListener(DemoPushEventsSource.PUSH_INPUT)
public void process(@Payload User model, @Headers Map<String, String> headers) throws Exception {
throw new Exception("error exception");
}
请参阅文档。
具体来说dlqDeadLetterExchange
和dlqDeadLetterRoutingKey
。
您已经获得了dlqTtl。
/** 设置APP被手动杀后的延迟时长(用于直接杀死应用时上报推送状态,若设置为0,会影响杀死应用的消息推送) @param time 时长 */ [Ntalker ntalker_setDelayKillAPP:5];
在SNS话题上设置死信队列和在Lambda功能上设置死信队列有什么区别? 我在想,因为如果您在SNS订阅上设置DLQ,那么当Lambda(订阅者)失败时,订阅消息将故障转移到DLQ,对吗?那么在这种情况下,在这两个地方设置DLQ会有相同的效果吗? 我在SNS主题订阅上设置了一个DLQ,它并没有“自动”地显示为Lambda屏幕设置上的DLQ,所以我想可能会有一些不同? SNS死信队列ref:http
问题内容: 我的ActiveMQ服务器中目前有一个名为的队列。每当消息处理失败时,ActiveMQ都会创建一个默认目录。是否可以将该名称更改为类似的名称?原因是将来我可能会有几个队列,而我希望它像 问题答案: 您要查找的东西称为,在此过程中,ActiveMQ为每个队列/主题创建特定的DLQ, 您可以按如下,通过调整你实现它有点 此配置将创建名称为的DLQ ,如果您不需要前缀,则可以删除属性。 希望
下面是我正在使用的配置。消息没有错误,从exchange到队列都可以正常工作,并且侦听器可以进行转换,这很好。对于错误消息,我希望发生的是,当我抛出AmqpRejectAndDontRequeueException时,“rabbitQueue”将消息转发到死信交换,并最终进入“rabbitErrorQueue”但死信交换或队列上没有任何活动。有人知道我做错了什么吗?
我试图通过JMS管理API为队列设置死信地址。通过阅读最新的Artemis文档,我似乎可以使用queueControl.setdeadletterAddress(...)来完成此操作方法。请参见https://activemq.apache.org/artemis/docs/latest/management.html并搜索“setdeadletteraddress”。 多谢!
我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...