我正在用java编写代码(使用Azure SDK for java),我有一个包含会话消息的服务总线队列。我想接收这些消息并将它们处理到另一个地方。
我使用QueueClient连接到队列,然后使用registerSessionHandler处理消息(下面的代码)。
问题是,每当收到消息时,我都可以打印有关它的所有详细信息,包括内容,但它会打印10次,每次之后都会打印一个异常。(打印10次:我理解这是因为在将消息抛出死信队列并转到下一条消息之前,存在10次重试策略。)
例外说
> USERCALLBACK-Receiver not created. Registering a MessageHandler creates a receiver.
带有异常的输出
但我确信SessionHandler与MessageHandler做的事情相同,但它包含对会话的支持,因此它应该创建一个接收方,因为它接收消息。我曾尝试使用MessageHandler,但它甚至无法工作,并停止整个程序,因为它不支持会话消息,而且我收到的消息都有会话。
我的问题是理解异常希望我做什么,以及如何修复代码以使它不会给我任何异常?有人对如何改进代码有什么建议吗?或者其他做同样事情的方法?
QueueClient qc = new QueueClient(
new ConnectionStringBuilder(connectionString),
ReceiveMode.PEEKLOCK);
qc.registerSessionHandler(
new ISessionHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessageSession messageSession, IMessage message) {
System.out.printf(
"\nMessage received: " +
"\n --> MessageId = %s " +
"\n --> SessionId = %s" +
"\n --> Content Type = %s" +
"\n --> Content = \n\t\t %s",
message.getMessageId(),
messageSession.getSessionId(),
message.getContentType(),
getMessageContent(message)
);
return qc.completeAsync(message.getLockToken());
}
@Override
public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession iMessageSession) {
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.println("\n Exception " + exceptionPhase + "-" + throwable.getMessage());
}
},
new SessionHandlerOptions(1, true, Duration.ofMinutes(1)),
Executors.newSingleThreadExecutor()
);
(对于感兴趣的人来说,getMessageContent(message)方法是一个单独的方法:)
public String getMessageContent(IMessage message){
List<byte[]> content = message.getMessageBody().getBinaryData();
StringBuilder sb = new StringBuilder();
for (byte[] b : content) {
sb.append(new String(b)
);
}
return sb.toString();
}
对于那些想知道的人,我设法解决了这个问题!
它只是通过使用Azure函数ServiceBusQueueTrigger来完成的,然后它将侦听服务总线队列并处理消息。通过将isSessionsEnabled设置为true,它将按照我的要求接受会话消息:)
因此,不再编写超过100行的代码,代码现在看起来是这样的:
public class Function {
@FunctionName("QueueFunction")
public void run(
@ServiceBusQueueTrigger(
name = "TriggerName", //Any name you choose
queueName = "queueName", //QueueName from the portal
connection = "ConnectionString", //ConnectionString from the portal
isSessionsEnabled = true
) String message,
ExecutionContext context
) {
// Write the code you want to do with the message here
// Using the variable messsage which contains the messageContent, messageId, sessionId etc.
}
}
我们使用Azure服务总线队列,并使用AcceptMessage会话(代码如下)接收消息。 接受消息会话成功,但当我在消息会话上调用接收时,返回null。 这种情况不会一直发生,而是在创建队列一段时间后发生。发生这种情况时,我验证了队列不是空的。当我关闭消息会话我看到我的队列缩短(在正确的消息数量!)但是我没有收到我的信息... 我使用的代码是: 更新: 我怀疑,虽然某些会话的所有消息都已过期,但
我需要一起处理相同的消息集,为此,我尝试了Azure服务总线会话启用功能。为了测试这一点,我创建了一个非常简单的应用程序,一个消息在队列中成功提交,然而,当试图在“ReceiveSessionMessage”函数中接收消息时,消息会话不会返回,程序会在这一行之后退出。 我无法找出确切的根本原因,任何帮助都将不胜感激。谢谢 等待会话客户端。AcceptMessageSessionAsync();]
我的设置: 我创建了一个没有启用会话的Azure服务总线队列 我有一个使用触发器 触发器间隔为 我的结果: 我正在以超过15秒的速度将消息放入队列 我的Azure逻辑应用触发器一次拾取多个队列消息,比每15秒拾取一条消息快得多 我的问题: 此连接器的触发器是否设计为一次拾取多条排队消息(在?)
我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/
我正在使用带有.NET核心的Azure服务总线 在我们的应用程序中,我们正在向服务总线发送会话消息。每当我们收到带有session-Id的取消请求时,我们需要删除/删除/完成带有该特定sessionId的消息,而不需要进行任何进一步的处理 但我得到了错误-请求的会话'session-name'不能被接受。它可能被另一个接收器锁定。
我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?