当前位置: 首页 > 知识库问答 >
问题:

本地NServicebus应用程序从Azure ServiceBus队列接收消息

方承弼
2023-03-14

我目前正努力在nServiceBus托管的应用程序上启动并运行一些东西。我有一个第三方正在向其发布消息的azure ServiceBus队列,我希望我的应用程序(目前在本地托管)接收这些消息。

我在谷歌上搜索了如何配置endpoint的答案,但我在有效的配置中没有运气。有人这样做过吗,因为我可以找到如何连接到Azure存储队列但不是服务总线队列的示例。(由于其他原因,我需要Azure servicebus队列)

我的配置如下所示

public void Init()
    {
        Configure.With()
           .DefaultBuilder()
           .XmlSerializer()
           .UnicastBus()
           .AzureServiceBusMessageQueue()
           .IsTransactional(true)
           .MessageForwardingInCaseOfFault()
           .UseInMemoryTimeoutPersister()
           .InMemorySubscriptionStorage();
    }

.Message=启动endpoint时出现异常,已记录错误。原因:输入队列[mytimeoutmanager@sb://[*].服务总线。窗户。net/]必须与此Source=NServiceBus位于同一台计算机上。主办

.

<configuration>
  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="AzureServiceBusQueueConfig" type="NServiceBus.Config.AzureServiceBusQueueConfig, NServiceBus.Azure" />
    <section name="AzureTimeoutPersisterConfig" type="NServiceBus.Timeout.Hosting.Azure.AzureTimeoutPersisterConfig, NServiceBus.Timeout.Hosting.Azure" />
  </configSections>
  <AzureServiceBusQueueConfig IssuerName="owner" QueueName="testqueue" IssuerKey="[KEY]" ServiceNamespace="[NS]" />
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <!-- Use the following line to explicitly set the Timeout manager address -->
  <UnicastBusConfig TimeoutManagerAddress="MyTimeoutManager" />
  <!-- Use the following line to explicity set the Timeout persisters connectionstring -->
  <AzureTimeoutPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
  <startup useLegacyV2RuntimeActivationPolicy="true">
    <supportedruntime version="v4.0" />
    <requiredruntime version="v4.0.20506" />
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0" />
  </startup>
</configuration>

共有2个答案

吴鸿彩
2023-03-14

我现在也遇到了同样的问题,花了几个小时想办法解决它。基本上,Azure超时持久器仅支持使用NServiceBus的Azure托管endpoint。群众或部队的集合。蔚蓝色的如果您使用NServiceBus。宿主进程要宿主endpoint,它使用NServiceBus。超时。群众或部队的集合。Windows命名空间类。它用MSMQ初始化了一个TransactionalTransport,然后您就得到了这条消息

我用了两种方法来避免它:

  1. 如果必须使用As_服务器endpoint配置,则可以使用。DisableTimeoutManager()在初始化中,它将完全跳过TimeoutDispatcher初始化

可能有一种方法可以以某种方式注入Azure timeout manager,但我还没有找到它,实际上我需要作为客户端的东西,所以它对我来说很好。

沃阳飙
2023-03-14

尝试移动UnicastBus()到呼叫结束,如下所示:

    Configure.With()
       .DefaultBuilder()
       .XmlSerializer()
       .AzureServiceBusMessageQueue()
       .IsTransactional(true)
       .MessageForwardingInCaseOfFault()
       .UseInMemoryTimeoutPersister()
       .InMemorySubscriptionStorage()
       .UnicastBus(); // <- Here

以及那些向队列中发送消息的第三方。请记住,他们需要尊重NServiceBus处理序列化/反序列化的方式。以下是如何在NServiceBus中完成此操作(最重要的部分是使用原始消息初始化BrokeredMessage,这是使用BinaryFormatter进行序列化的结果):

    private void Send(Byte[] rawMessage, QueueClient sender)
    {
        var numRetries = 0;
        var sent = false;

        while(!sent)
        {
            try
            {
                var brokeredMessage = new BrokeredMessage(rawMessage);

                sender.Send(brokeredMessage);

                sent = true;
            }
                // back off when we're being throttled
            catch (ServerBusyException)
            {
                numRetries++;

                if (numRetries >= MaxDeliveryCount) throw;

                Thread.Sleep(TimeSpan.FromSeconds(numRetries * DefaultBackoffTimeInSeconds));
            }
        }

    }

    private static byte[] SerializeMessage(TransportMessage message)
    {
        if (message.Headers == null)
            message.Headers = new Dictionary<string, string>();

        if (!message.Headers.ContainsKey(Idforcorrelation))
            message.Headers.Add(Idforcorrelation, null);

        if (String.IsNullOrEmpty(message.Headers[Idforcorrelation]))
            message.Headers[Idforcorrelation] = message.IdForCorrelation;

        using (var stream = new MemoryStream())
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(stream, message);
            return stream.ToArray();
        }
    }

如果您希望NServiceBus正确地反序列化消息,请确保您的用户正确地序列化它。

 类似资料:
  • 我们有一个使用NServicebus的本地Web应用程序,我们希望将队列移动到Azure(作为完全基于云的更大迁移路径的一部分)。我正试图建立一个POC,但似乎无法让它工作。 我有以下配置: 在我的应用程序中,我将总线配置为: 当我尝试发布到队列时,我得到以下消息: 我有点不知所措,我尝试了在网上找到的几种不同的示例,但我觉得我对azure知识的缺乏阻碍了我的发展。我从这个问题中提取了大部分配置:

  • 我已经建立了AWS架构体系,以便每次对发电机数据库条目的更新都以启用重复数据删除的SQS先进先出队列结束。我还有一个测试来覆盖这个场景,在那里我清除了队列(队列可以从套装中的其他测试中获得更新。为了避免在收到正确的消息之前必须轮询大量的消息,我在运行测试之前清除队列),更新Dynamo Db,并在轮询队列时检查这些条目是否收到。这个测试是不稳定的,有时会失败,因为我发送的所有更新都没有从队列中收到

  • 几天前,我发布了这个问题,来自Azure IOT中心的短信 我曾尝试实现建议的logic app,我的问题是logic app没有通过服务总线接收任何消息,事实上没有消息到达服务总线。当我尝试在logic应用程序中运行触发器时,它会弹出一个对话框,告诉我“When_a_message_is_received_in_a_queue”。当我运行logic应用程序时,它说工作流程在几分钟后超时。 我复制

  • 当使用JavaSDK(0.4.1)从SB队列检索消息时,即使所讨论的队列为空,也会返回非空。对该结果调用将返回一个非空的。我怎么知道我在一个空队列上打了接听电话?显然,在分布式并发系统中,查询大小不是一个选项。 我使用模式,队列是新创建的,所以我知道它是空的。 根据docs,对API的底层调用应该返回204的状态码。在返回的结果或消息中,我找不到任何方法访问该代码。

  • 因此,我使用Spring integration链接JMS和ActiveMQ,如下所示:- 如何使其工作,以便发送到此队列并从中接收消息?请帮忙。

  • 使用启用会话(消息排序)的Azure ServiceBus队列,我的会话需要持续几分钟到几个小时。 为此,我将QueueClient配置如下: 并按如下方式开始接收消息: 在几次(1到6次之间)成功(几乎是瞬时的)消息接收回调之后——无论是对于新会话还是现有会话,接收处理程序都会停止触发。使用,我可以看到位于servicebus队列上的消息。有趣的是,它们都有一个DeliveryCount=1。过