当前位置: 首页 > 知识库问答 >
问题:

Azure服务总线-使用OnMessage()方法接收消息

慕容昊焜
2023-03-14

根据MS文档,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都接收一条消息--一个恒定的轮询。因此,使用了SubscriptionClient类的OnMessage()方法。

MS文档说:“...当调用OnMessage时,客户端启动一个内部消息泵,该消息泵不断轮询队列或订阅。该消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调用。...”

但是当应用程序运行时,调用moment OnMessage()方法只接收最新消息。当发布新消息时,持续轮询似乎不起作用。在尝试了许多不同的方法之后,唯一的方法是使用无限循环将代码放入一个单独的任务中,使应用程序在收到新消息时做出反应。在这么多层面上,这似乎是完全错误的!(请参阅下面的代码)。

有没有人可以帮助我修正我的代码或者发布一个工作样例来完成相同的功能而不需要循环?谢谢你!

 public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
        {
            var newMessage = new MessageQueue();
            int i = 0;

            Task listener = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

                    Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

                    OnMessageOptions options = new OnMessageOptions();
                                     options.AutoComplete = false;
                                     options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

                    Client.OnMessage((message) =>
                    {
                        try
                        {
                            retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
                            retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
                            retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
                            retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
                            retrievedMessage.Add("message", message.Properties["Message"].ToString());

                            newMessage.AnnounceNewMessage(retrievedMessage); // event ->

                            message.Complete(); // Remove message from subscription.
                        }
                        catch (Exception ex)
                        {
                            string exmes = ex.Message;
                            message.Abandon();
                        }

                    }, options);

                    retrievedMessage.Clear();

                    i++;

                    Thread.Sleep(3000);
                }

            });
        }

共有1个答案

邹海荣
2023-03-14

你的代码有几个问题需要解决-

  • 它会失败,我假设您的应用程序会退出--或者至少正在侦听消息的线程会终止。
  • 您的while循环会不断重复代码以挂起消息处理程序,您只需要执行一次操作。
  • 您需要一种方法来保持调用堆栈活动,并防止应用程序垃圾收集您的对象。

下面的内容将会看到你走向成功的道路。祝你好运.

 ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
    SubscriptionClient Client;

    public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
    {
        Task listener = Task.Factory.StartNew(() =>
        {
            // You only need to set up the below once. 
            Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = false;
            options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
            options.ExceptionReceived += LogErrors;

            Client.OnMessage((message) =>
            {
                try
                {
                    Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                    message.Complete(); // Remove message from subscription.
                }
                catch (Exception ex)
                {
                    Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                    message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                }

            }, options);

            CompletedResetEvent.WaitOne();
        });
    }

    /// <summary>
    /// Added in rudimentary exception handling .
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
    private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
    {
        Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
    }

    /// <summary>
    /// Call this to stop the messages arriving from subscription.
    /// </summary>
    public void StopMessagesFromSubscription()
    {
        Client.Close(); // Close the message pump down gracefully
        CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
    }
var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                       cancellationToken);
 类似资料:
  • 参考https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.Azure.servicebus/basicsendreceiveusingtopicsubscriptionclient,我了解Azure服务总线主题的一般工作方式,我的问题更多地是关于它实际上是如何工作的。

  • 我希望以批处理模式接收来自Azure ServiceBus主题的消息。 阅读https://docs.microsoft.com/en-us/Azure/Azure-functions/functions-best-practices时指出: 我有一个方法: 这个方法是有效的,但它一次只需要一个消息。 根据Microsoft文档,我可以将其更改为: 注意:主题和订阅已启用“启用批处理操作”设置。

  • 调试此代码时,应用程序停止。未显示任何错误消息或异常。 项目X: 下面我将通过REST API发布JSON对象。 项目Y: 有办法找到尸体吗?

  • 我有一个超时选项,只想在超时前接收消息。 如果您能解释下面的代码是如何工作的,以及我如何修改下面的代码以在特定的时间框架内接收消息,并且一旦我的超时已经到达就停止接收,这将是很有帮助的。

  • 这是我能找到的最接近的前一个问题:Azure Service Bus Subscription OnMessage未接收消息。 同样的事情也发生在我身上。当我改变主题的名称时,它会再次工作一段时间。则该服务总线主题再次损坏。只有65-71%的消息到达。无助于删除子内容,也无助于删除主题。题名似乎过了一段时间不知怎么就被污染了。这是真的真的很糟糕,因为我没有办法告诉什么时候主题是腐败的,除了系统不像

  • 我正在使用Azure服务总线主题机制。此外,我已经将消息发送到主题,并希望通过编程方式检查是否将消息发送到主题。 代码: 有没有办法获得响应或状态代码?