当前位置: 首页 > 知识库问答 >
问题:

如何使用AMQP在"PeekLock"模式下从Azure服务总线队列获取消息?

厉永宁
2023-03-14

我们试图在节点应用程序中使用Azure服务总线。我们的要求是从一个队列中获取多个消息。

由于Azure SDK for Node不支持批量检索,我们决定使用AMQP。而我们可以使用这里描述的Peek消息获取消息(https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-操作)。

我们注意到的是,一旦获取消息,它们就会从队列中删除。我想知道,如果我在使用AMQPAM模式时使用AMQPAM来获取消息,我将如何锁定任何人。对于AMQP,我们使用amqp10节点包(https://www.npmjs.com/package/amqp10).

这是我们偷看信息的代码:

const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
    return Promise.all([
        client.createReceiver(queueName),
        client.createSender(queueName)
    ]);
})
.spread(function(receiver, sender) {
    console.log(receiver);
    console.log(sender);
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', function(err) {
        // check for errors
        console.log(err);
    });
    receiver.on('message', function(message) {
        console.log('Received message');
        console.log(message);
        console.log('------------------------------------');
    });

    return sender.send([], {
        operation: 'com.microsoft:peek-message',
        'message-count': 5
    });
})
.error(function (e) {
    console.warn('connection error: ', e);
});

共有1个答案

束雅达
2023-03-14

默认情况下,接收方在自动结算模式下工作,您必须将其更改为处置结算:

const { Constants } = require('amqp10')

// 
// ...create client, connect, etc...
//

// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
  attach: {
    rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
  }
})

不要忘记在处理消息后接受/拒绝/释放消息:

receiver.on('message', msg => {
  //
  // ...do something smart with a message...
  //

  receiver.accept(msg) // <- manually settle a message
})
 类似资料:
  • 我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/

  • 如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。 我想不断地寻找信息,然后处理它。

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队

  • 我们在应用程序中对Azure服务总线队列使用HTTPs模式。 但我们不确定Azure服务总线如何在HTTP模式下交付消息,如果服务总线客户端使用轮询,则轮询到Azure服务总线队列的频率。 我们使用以下软件包:

  • 如何获取Azure服务总线队列中死信消息的数量? 我可以像这样得到队列中的计数。。。 但这看起来既包括队列中的消息,也包括关联死信队列中的消息 我如何区分它们?

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?