当前位置: 首页 > 知识库问答 >
问题:

如何轮询azure服务总线队列以获取消息?

淳于开畅
2023-03-14

如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。

   from azure.servicebus import QueueClient

   client = QueueClient.from_connection_string(
       q_string,
       q_name)

   msg = None

   with client.get_receiver() as queue_receiver:
     messages = queue_receiver.fetch_next(max_batch_size=1, timeout=3)
     if len(messages) > 0:
        msg = messages[0]
        print(f"Received {msg.message}")

  return msg

我想不断地寻找信息,然后处理它。

共有2个答案

阮雅达
2023-03-14

您可以在v7.0.0中使用ServiceBusRecector持久地接收消息

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    # max_wait_time specifies how long the receiver should wait with no incoming 
    # messages before stopping receipt.  
    # Default is None; to receive forever.
    with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
        msg = receiver.next() # it's a generator
            # If it is desired to halt receiving early, one can break out of the loop here safely.
西门磊
2023-03-14

正如乔治·陈所说,我认为服务总线队列触发器符合您的要求。

\u init\u。py:

import logging

import azure.functions as func


def main(msg: func.ServiceBusMessage):
    logging.info('Python ServiceBus queue trigger processed message: %s',
                 msg.get_body().decode('utf-8'))

函数。json:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "test",
      "connection": "str"
    }
  ]
}

env var:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=lti/ThmF+mw9BebOacp9gVazIh76Q39ecikHSCkaTcGK5hmInspX+EkjzpNmvCPWsnvapWziHQHL+kKt2V+lZw==;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "str": "Endpoint=sb://bowmantest.servicebus.xxxxxxx"
  }
}

如果在azure上,则环境变量位于配置设置上,而不是本地设置上。设置。json。

当您这样做时,触发器将帮助您捕获服务总线队列中的信息。(

 类似资料:
  • 我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队

  • 如何获取Azure服务总线队列中死信消息的数量? 我可以像这样得到队列中的计数。。。 但这看起来既包括队列中的消息,也包括关联死信队列中的消息 我如何区分它们?

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 我们目前正在利用Azure服务总线来处理来自应用程序的各种消息。 我想知道实时处理这些消息的最佳方式是什么? 有没有一种方法可以在消息放入队列时自动执行脚本? 我只是在想,一定有比让一个单独的应用程序每分钟/30秒检查一次队列更好的方法。 谢谢各位

  • 我试图找出队列中可以调度的消息数量是否有限制。我需要在未来的7到21天内安排数百万条消息。看了这个,但它没有说任何关于预定消息的内容。https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quotas#messaging-配额