当前位置: 首页 > 知识库问答 >
问题:

Java:Azure服务总线队列接收带有会话的消息

上官波鸿
2023-03-14

我正在用java编写代码(使用Azure SDK for java),我有一个包含会话消息的服务总线队列。我想接收这些消息并将它们处理到另一个地方。

我使用QueueClient连接到队列,然后使用registerSessionHandler处理消息(下面的代码)。

问题是,每当收到消息时,我都可以打印有关它的所有详细信息,包括内容,但它会打印10次,每次之后都会打印一个异常。(打印10次:我理解这是因为在将消息抛出死信队列并转到下一条消息之前,存在10次重试策略。)

例外说

> USERCALLBACK-Receiver not created. Registering a MessageHandler creates a receiver.

带有异常的输出

但我确信SessionHandler与MessageHandler做的事情相同,但它包含对会话的支持,因此它应该创建一个接收方,因为它接收消息。我曾尝试使用MessageHandler,但它甚至无法工作,并停止整个程序,因为它不支持会话消息,而且我收到的消息都有会话。

我的问题是理解异常希望我做什么,以及如何修复代码以使它不会给我任何异常?有人对如何改进代码有什么建议吗?或者其他做同样事情的方法?

QueueClient qc = new QueueClient(
            new ConnectionStringBuilder(connectionString),
            ReceiveMode.PEEKLOCK);

qc.registerSessionHandler(
            new ISessionHandler() {
                @Override
                public CompletableFuture<Void> onMessageAsync(IMessageSession messageSession, IMessage message) {
                    System.out.printf(
                            "\nMessage received: " +
                                    "\n --> MessageId = %s " +
                                    "\n --> SessionId = %s" +
                                    "\n --> Content Type = %s" +
                                    "\n --> Content = \n\t\t %s",
                            message.getMessageId(),
                            messageSession.getSessionId(),
                            message.getContentType(),
                            getMessageContent(message)
                    );

                    return qc.completeAsync(message.getLockToken());
            }
                @Override
                public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession iMessageSession) {
                    return CompletableFuture.completedFuture(null);
                }

                @Override
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.println("\n Exception " + exceptionPhase + "-" + throwable.getMessage());
                }
            },
            new SessionHandlerOptions(1, true, Duration.ofMinutes(1)),
            Executors.newSingleThreadExecutor()
);

(对于感兴趣的人来说,getMessageContent(message)方法是一个单独的方法:)

public String getMessageContent(IMessage message){
    List<byte[]> content = message.getMessageBody().getBinaryData();
    StringBuilder sb = new StringBuilder();
    for (byte[] b : content) {
        sb.append(new String(b)
        );
    }
    return sb.toString();
}

共有1个答案

曹君墨
2023-03-14

对于那些想知道的人,我设法解决了这个问题!

它只是通过使用Azure函数ServiceBusQueueTrigger来完成的,然后它将侦听服务总线队列并处理消息。通过将isSessionsEnabled设置为true,它将按照我的要求接受会话消息:)

因此,不再编写超过100行的代码,代码现在看起来是这样的:

public class Function {

@FunctionName("QueueFunction")
public void run(
        @ServiceBusQueueTrigger(
               name = "TriggerName", //Any name you choose
               queueName = "queueName", //QueueName from the portal
               connection = "ConnectionString", //ConnectionString from the portal
               isSessionsEnabled = true
        ) String message,
        ExecutionContext context
) {
    // Write the code you want to do with the message here
    // Using the variable messsage which contains the messageContent, messageId, sessionId etc.
  }
}
 类似资料:
  • 我们使用Azure服务总线队列,并使用AcceptMessage会话(代码如下)接收消息。 接受消息会话成功,但当我在消息会话上调用接收时,返回null。 这种情况不会一直发生,而是在创建队列一段时间后发生。发生这种情况时,我验证了队列不是空的。当我关闭消息会话我看到我的队列缩短(在正确的消息数量!)但是我没有收到我的信息... 我使用的代码是: 更新: 我怀疑,虽然某些会话的所有消息都已过期,但

  • 我需要一起处理相同的消息集,为此,我尝试了Azure服务总线会话启用功能。为了测试这一点,我创建了一个非常简单的应用程序,一个消息在队列中成功提交,然而,当试图在“ReceiveSessionMessage”函数中接收消息时,消息会话不会返回,程序会在这一行之后退出。 我无法找出确切的根本原因,任何帮助都将不胜感激。谢谢 等待会话客户端。AcceptMessageSessionAsync();]

  • 我的设置: 我创建了一个没有启用会话的Azure服务总线队列 我有一个使用触发器 触发器间隔为 我的结果: 我正在以超过15秒的速度将消息放入队列 我的Azure逻辑应用触发器一次拾取多个队列消息,比每15秒拾取一条消息快得多 我的问题: 此连接器的触发器是否设计为一次拾取多条排队消息(在?)

  • 我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/

  • 我正在使用带有.NET核心的Azure服务总线 在我们的应用程序中,我们正在向服务总线发送会话消息。每当我们收到带有session-Id的取消请求时,我们需要删除/删除/完成带有该特定sessionId的消息,而不需要进行任何进一步的处理 但我得到了错误-请求的会话'session-name'不能被接受。它可能被另一个接收器锁定。

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?