当前位置: 首页 > 知识库问答 >
问题:

Azure服务总线IQueueClient。registerSessionHandler:无法连续使用来自队列/主题的消息

鲍永春
2023-03-14

要求:我需要从队列中获取所有会话中的所有消息registerSessionHandler应该在消息出现在队列中时立即使用它们。

问题:代码接受与会话id(xyz)之一相关的消息,然后等待。即使使用该会话id(xyz)推送更多消息,它也无法使用它。

任何建议-我在这里错过了什么明显的东西。

我有registerSessionHandler,用于从队列连续接收会话消息。每当我开始新会话时,我只收到一个sessionID的消息。

测试队列:在这个队列中,有20条消息可用,有4个不同的会话ID。每当我运行Java应用程序(QueueSessionReceiveTest.java)时,我只得到5条与单个会话ID相关的消息。

MAVEN-azure服务总线-1.1.1

接收代码:

import java.time.Duration;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.SessionHandlerOptions;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
public class QueueSessionReceiveTest {
    private static final String connectionString = "Endpoint=sb://XXXXXX";
    private static final String queueName = "test";
    private static IQueueClient queueClient;
    public static void main(String[] args) throws Exception {
        queueClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
        queueClient.registerSessionHandler(new QueueMessageSessionHandler(), new SessionHandlerOptions(1, false, Duration.ofMinutes(1)));           
        }}

会话处理程序代码

import java.util.concurrent.CompletableFuture;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageSession;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.ISessionHandler;
public class QueueMessageSessionHandler implements ISessionHandler {
    @Override
    public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage iMessage) {
        return session.completeAsync(iMessage.getLockToken()).thenRunAsync(() -> Logger.debug("some log") );
    }
    @Override
    public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
        return null;
    }
    @Override
    public void notifyException(Throwable exception, ExceptionPhase exceptionPhase) {
        // Do nothing
    }}

共有1个答案

董飞航
2023-03-14

请参考下面的链接了解解决方案。

https://github.com/Azure/azure-service-bus-java/issues/187

 类似资料:
  • 我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe

  • 是否可以通过REST连续监听队列中的消息?

  • 我正在构建Windows Phone应用程序,无法使用Microsoft。服务总线。信息。QueueClient类。 然后,我尝试使用Azure Service Bus REST API进行发送,但这需要我构建一个SAS令牌。但要构建SAS令牌,我需要使用Windows。安全密码学。果心MacAlgorithmNames。HmacSha256。此类显示在前面的类型中,但在编译时它不存在。 如何使用

  • 问题是一旦解决了异常的来源,如何处理错误。一旦消息出错并最终出现在_error队列中,我希望在消息和/或服务修复后将消息移回处理。我无法将消息从_error队列移动到主题,因为该主题上的每个服务都将再次获得该消息。 我试图使用ReceiveEndpoint方法创建第二个队列,该方法名为_errorRecovery,但这样做会导致队列订阅主题,这意味着_errorRecovery队列获取发布到该主题

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 我在同一命名空间中有2个Azure Service Bus队列。我使用ScheduledQueueTimeUTC将计划消息放置在其中一个队列(“计划队列”)中,并将要立即处理的消息放置在另一个队列(“现在队列”)中。 我想做的是设置“预定队列”的自动转发属性,以便在消息到达它们的“预定队列”时将消息转发到“现在队列”。队列转发会这样工作吗?还是队列自动转发会立即发送已排队和未排队的消息? 我在任何