当前位置: 首页 > 知识库问答 >
问题:

使用AMQP从Azure Eventhub接收事件

贝德辉
2023-03-14

目标:使用python从EventHub接收消息。

设置:

  • Python 3.5
  • AMQP 1.0
  • 在新门户中创建的EventHub。
  • Ubuntu 16.04和Mac OS X。
  • 质子==0.8.8和python-qpid-质子==0.17.0

几天来,我一直在尝试连接AMQP和Azure EventHub。我可以用Java客户端很好地做到这一点(作为确保EventHub工作的测试),但我需要能够用Python做到这一点。

在网上阅读了几个例子后,我在过去几天尝试了很多事情。我正在使用<code>RootManageSharedAccessKey以及正常报价。

目前我的字符串如下所示:

am qp s://根管理共享访问密钥:

如果我将这些相同的参数复制到Java客户机中,它就可以正常工作。如果我使用python SDK发送,它也可以正常工作(它通过HTTPS发送)。我尝试过使用amqpamqp如果我为amqp打开调试参数,我会得到错误:

condition=:"amqp:unauthorized-access",
description="Unauthorized access. 'Send' claim(s) are required to perform this operation.
Resource: 'sb://<namespace>.servicebus.windows.net/<internal event hub>

这听起来是一个权限问题,但相同的密钥和所有内容在Java和Python中都可以通过HTTPS完美运行,因此它必须是我的代码与密钥中的内容。也许我没有正确编码它们(我已经读过你不需要编码它们)?一旦我让它工作,我将为其他人制作GIST和更新。

我的当前代码如下发送应该更容易:

from proton import Messenger, Message

messenger = Messenger()
message = Message()
key = "key"
message.address = "amqps://RootManageSharedAccessKey:"+key+"@<namespace>.servicebus.windows.net/<hubname>"

message.body = u"This is a text string"
messenger.put(message)
messenger.send()`

我尝试过的事情:

  1. 使事件中心和命名空间上的发送和接收策略分开。
  2. 重新创建集线器
  3. 安全与不安全
  4. 将端口添加到防火墙
  5. Python 2.7质子vs 3.5
  6. Linuxvs Mac
  7. 主键vs副键
  8. 新命名空间
  9. 键中有和没有/的新密钥
  10. URL编码部分和整个字符串
  11. UTF-8编码
  12. JSON消息与短信。

共有1个答案

姚麒
2023-03-14

这对您的项目来说可能太晚了,但是这里有一个新的用于Python的AMQP 1.0库正在开发中:

http://github.com/Azure/azure-uamqp-python

尽管它仍处于早期阶段(尚未准备好生产),但repo中有一些用于Azure事件中心的示例:

https://github.com/Azure/azure-uamqp-python/tree/master/samples

关于您在使用Proton时看到的错误-看起来它可能试图使用错误的身份验证机制。您尝试过使用事件中心Python SDK吗?他们在这里使用了一些可能感兴趣的设置

 类似资料:
  • 这里是我的application.yml的相关部分 从文档中,我发现应该创建一个SpringAMQPMessageSource bean: 如果我从rabbitmq管理面板向队列发送消息,我会看到日志: SpringAMQPMessageSource的JavaDoc是这样说的: 但到现在我都找不到在哪里注册,也找不到怎么注册。 配置中的Axon.EventHandling条目和聚合中的@Proce

  • 我浏览了RabbitTemplate的API。它只提供从队列中获取消息的receive方法。但是,无法获得具有特定关联ID的消息。你能帮我理解一下我在这里错过了什么吗。 目前,我正在使用ActiveMQ的JMS API接收消息,使用以下代码创建带有消息选择器的Consumer。希望对Spring AMQP和RabbitMQ执行同样的操作:

  • 启动日志的尾端如下所示: 因此,PolicyService似乎成功连接到message Broker。 Turbine AMQP服务器的日志结束: 编辑:下面的异常是当我停止收听涡轮流时抛出的,而不是当我尝试用仪表板收听时抛出的。 我对Turbane-AMQP的依赖关系如下:

  • 问题内容: 这是代码,但出现错误: 可以从插座接收使用吗? 问题答案: 不。当您有字节数组时,要使用该数组,并且要像文件一样从数组中读取,就可以使用。如果只想从套接字读取字节数组,请执行以下操作: 该变量将包含实际读取的字节数,并且数据当然将在array中。

  • 我尝试将Azure服务总线与ApacheQPID和Spring与事务集成。 但Azure服务总线AMQP实现似乎不支持事务。这是真的吗?我没有找到相关信息。 这是我的JMS配置 这是我的spring集成片段: 它与session transact=“false”配合使用,但与session transact=“true”配合使用时会产生错误: QPID跟踪

  • 我想知道是否有任何方法可以在我的Udp客户端接收数据时以某种方式触发事件?(System. Net. Socket. Udp客户端) 我尝试覆盖它的 UdpClient.Receive,但我不能根据这个,因为可覆盖的成员必须标记为虚拟或抽象。 我目前的做法是在Threading.Timer上运行一个TimerCallback,以特定的时间间隔来获取我的数据,但我更愿意在传入数据到达时接收它,而不是