当前位置: 首页 > 知识库问答 >
问题:

如何使用Java将错误消息移动到Azure死信队列(主题-订阅)?

微生啸
2023-03-14

我需要将我的消息发送到死信队列从Azure主题订阅在任何错误的情况下,而阅读和处理从主题的消息。所以我尝试直接将消息推送到DLQ。

我的样本代码会像

static void sendMessage()
{
    // create a Service Bus Sender client for the queue 
    ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sender()
            .topicName(topicName)
            
            .buildClient();

    // send one message to the topic
    
    
    senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));    


}
static void resceiveAsync() {
    ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .topicName(topicName)
            .subscriptionName(subName)
            .buildAsyncClient();

        // receive() operation continuously fetches messages until the subscription is disposed.
        // The stream is infinite, and completes when the subscription or receiver is closed.
        Disposable subscription = receiver.receiveMessages().subscribe(message -> {

            System.out.printf("Id: %s%n", message.getMessageId());
            System.out.printf("Contents: %s%n", message.getBody().toString());
        }, error -> {
                System.err.println("Error occurred while receiving messages: " + error);
            }, () -> {
                System.out.println("Finished receiving messages.");
            });

        // Continue application processing. When you are finished receiving messages, dispose of the subscription.
        subscription.dispose();

        // When you are done using the receiver, dispose of it.
        receiver.close();
    
    
    
}

我试着获取死信队列路径

    String dlq = EntityNameHelper.formatDeadLetterPath(topicName);

我得到了死信队列的路径,如=“mytopic/$deadletterqueue”

但它在将路径作为主题名传递时不起作用。它引发实体主题未找到异常。

有谁能告诉我这件事吗

参考:如何使用Java将错误消息移动到Azure死信队列?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#moving-messages-to-the-dlq

如何在SpringBootJava中将失败消息推送到Azure服务总线死信队列?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-topics-subscriptions-legacy#receive-messages-from-a-subscription

共有1个答案

燕玉堂
2023-03-14

您可能知道,如果在处理过程中抛出异常,并且超过了最大延迟计数,则消息将自动移动到死信队列。如果要显式地将消息移动到DLQ,也可以这样做。一种常见的情况是,如果您知道消息由于其内容而永远不会成功。

您不能直接向DLQ发送新消息,因为这样系统中就会有两条消息。需要对父实体调用特殊操作。还有,

<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue

https://github.com/Azure/azure-service-bus/blob/master/samples/Java/azure-servicebus/DeadletterQueue/src/main/java/com/microsoft/azure/servicebus/samples/deadletterqueue/DeadletterQueue.java

此示例代码适用于队列,但您应该能够非常轻松地将其适应主题:

    // register the RegisterMessageHandler callback
    receiver.registerMessageHandler(
            new IMessageHandler() {
                // callback invoked when the message handler loop has obtained a message
                public CompletableFuture<Void> onMessageAsync(IMessage message) {
                    // receives message is passed to callback
                    if (message.getLabel() != null &&
                            message.getContentType() != null &&
                            message.getLabel().contentEquals("Scientist") &&
                            message.getContentType().contentEquals("application/json")) {

                        // ...
                    } else {
                        return receiver.deadLetterAsync(message.getLockToken());
                    }
                    return receiver.completeAsync(message.getLockToken());
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
            },
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
            executorService);

 类似资料:
  • 我正在使用AWS SQS和死信队列。 这可能吗?我是不是缺少了一个配置选项? 问候你,伊多

  • 在Azure服务总线主题中,消息如何从死信队列移动到主题? 它是否会自动移动到主题,或者我们需要在门户中配置主题的属性,或者是否有任何其他方法可以做到这一点?(我不喜欢在这里使用任何代码,我希望只做配置更改)

  • 我已经设置了Apache camel,在其中我使用来自一个队列的消息并对其进行某种操作,然后将其传输到其他队列。 现在,如果异常来了,我希望它应该回滚,然后在6次尝试后,它发送到死信队列,目前回滚发生5-6次,但我的消息没有转移到死信队列。 这里会发生什么-->Queue1->>(消耗)-->Operation(引发异常)-->Rollback-->Queue1->>(消耗)-->Operatio

  • 如果消息中毒,Azure服务总线队列触发函数将执行默认策略,将消息发送到死信队列。但是,我们是否可以手动将消息发送到死信队列?有几次,由于某种内部或业务异常,我们不希望Azure Function执行其默认策略,我们希望将其他信息绑定到消息,并手动将其发送到Azure服务总线截止日期队列。 在以前使用BrokeredMessage对象的版本中,有一个名为 然而,在最新的Functino 2中。十、

  • 问题是一旦解决了异常的来源,如何处理错误。一旦消息出错并最终出现在_error队列中,我希望在消息和/或服务修复后将消息移回处理。我无法将消息从_error队列移动到主题,因为该主题上的每个服务都将再次获得该消息。 我试图使用ReceiveEndpoint方法创建第二个队列,该方法名为_errorRecovery,但这样做会导致队列订阅主题,这意味着_errorRecovery队列获取发布到该主题