我可以很好地从服务总线订阅接收消息,但当侦听器中发生异常时,似乎最终会将state=Modified和undeliverableHere=true的处置框架发送到服务总线。服务总线的文档说它不支持amqp修改的配置。
消息以服务总线中的延迟状态结束,我不知道如何将消息推回到活动状态。
JMS配置:
@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
connectionFactory.setClientID(clientId);
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
policy.html" target="_blank">setMaxRedeliveries(50);
return policy;
}
@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(true);
factory.setPubSubDomain(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
听众:
@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
logger.debug("Got payload: " + dto);
throw new RuntimeException("Oops");
}
以下是我在日志中看到的内容:
消息传输开始
[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........
抛出异常,然后JMS在本地重新传递消息50次(为什么这样做?)
之后我看到的下一个amqp协议帧是
[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}
在我看来,这最后一个处置框架导致消息进入延迟状态。消息上还有一个锁令牌。即使当TTL通过时,消息仍然卡在订阅中,通过RESTAPI戳消息也没有任何帮助。我尝试过解锁它(使用PUT)和删除它(使用DELETE)。我也尝试过用RESTAPI(包括PeekLock和receive和delete变体)来接收它,但看起来它们好像不存在。我在订阅上设置了在过期后自动将消息移动到死信队列的选项,并且它们永远不会移动。
qpid jms中实现ack的代码就在这里,看来库的这一部分不应该被扩展,否则我将创建自己的实现来返回不同的ack。
如何获得qpid/JMS
首先,JMS规范正确地指出,从MessageListener回调引发异常实际上是一个应用程序编程错误,您的应用程序应该自己处理这些错误。
第二,客户端正在使用正确的配置来指示消息未能传递到此客户端,远程应该支持AMQP 1.0规范中概述的所有配置,我会联系微软并要求他们更紧密地实现该规范。
为了解决上述两个问题,您可以在客户端确认模式下接收消息,当您截获抛出的异常时,您可以使用本JIRA问题中描述的机制配置消息确认。
代码看起来像下面这样:
message.setIntProperty("JMS_AMQP_ACK_TYPE", 2);
message.acknowledge();
其中,模式定义为:
ACCEPTED = 1;
REJECTED = 2;
RELEASED = 3;
MODIFIED_FAILED = 4;
MODIFIED_FAILED_UNDELIVERABLE = 5;
如果丢失了序列号,是否有办法恢复或删除Azure服务总线上的延迟消息? 场景是:我想使用来延迟消息。我计划记录序列号,并在以后使用它来检索消息。但是如果出了问题——假设部署了一些错误代码——并且序列号没有正确记录,那么这条消息似乎将以延迟状态保留在服务总线上,直到消息过期,这可能是永远的。 这主要是因为该消息将占用队列或订阅上的空间,除了完全删除队列/订阅之外,我还没有找到任何方法来恢复该空间。
我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道
我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。
我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe
我正在使用Microsoft Azure ServiceBus对队列消息进行排队,并使用WCF对订阅进行排队。我正在尝试实现重试逻辑。我使用Peak/Lock查看消息,然后必须对消息进行一些本地处理。如果处理失败,我将解锁消息,以便再次尝试处理它。问题是我需要在处理尝试之间建立一个延迟。当前,它被弹出回队列,然后几乎立即被处理。两次尝试之间需要大约2分钟。
这是我能找到的最接近的前一个问题:Azure Service Bus Subscription OnMessage未接收消息。 同样的事情也发生在我身上。当我改变主题的名称时,它会再次工作一段时间。则该服务总线主题再次损坏。只有65-71%的消息到达。无助于删除子内容,也无助于删除主题。题名似乎过了一段时间不知怎么就被污染了。这是真的真的很糟糕,因为我没有办法告诉什么时候主题是腐败的,除了系统不像