当前位置: 首页 > 知识库问答 >
问题:

如何消费RabbitMQ在WCF?

公良浩邈
2023-03-14

我有一个场景,其中可执行文件是生产者,WCF服务是消费者。

WCF服务工作流程如下:

1) 服务调用可执行文件(producer),该可执行文件是另一个将消息生成RabbitMQ队列的进程。

2) 服务必须使用来自RabbitMQ队列的消息

3)将数据返回给客户端

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;

namespace ConnectionServices
{

    public class Connection : IConnection
    {
        public string ConnectSite(string provider, string server, string siteName)
        {
            InvokeProducer(provider, server, siteName);
            string activeInstance = RunRabbitMQ();
            return activeInstance;

        }
        public void InvokeProducer(string provider, string server, string siteName)
        {
            string siteManagerExePath = @"C:\Users\mbmercha\Documents\Visual Studio 2015\Projects\Producer\Producer\bin\Debug\Producer.exe";
            try
            {
                ProcessStartInfo startInfo = new ProcessStartInfo();
                Process siteManagerProcess = new Process();
                startInfo.FileName = siteManagerExePath;
                startInfo.Arguments = string.Format("{0} {1} {2} {3}", "-b ", provider, server, siteName);
                siteManagerProcess.StartInfo = startInfo;
                siteManagerProcess.Start();
                siteManagerProcess.WaitForExit();

            }
            catch (Exception e)
            {

            }
        }
        public string RunRabbitMQ()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            string activeInstance = null;
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("DurableQueue", true, false, false, null);
                channel.ExchangeDeclare("DurableExchange", ExchangeType.Topic, true);
                channel.QueueBind("DurableQueue", "DurableExchange", "durable");
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    activeInstance = message;
                };
                channel.BasicConsume(queue: "DurableQueue",
                                     autoAck: false,
                                     consumer: consumer);


            }
            return activeInstance;
        }
    }
}

到目前为止,服务能够调用可执行文件并在队列中生成消息。

但服务从第2步开始失败,它将返回null而不是实际消息。有人能告诉我这里缺少什么吗?

提前感谢。

共有1个答案

乜业
2023-03-14

您从未将activeInstance设置为除null之外的任何值。

您似乎正在使用异步API,这意味着您在RunRabbitMQ方法调用完成很久之后从RabbitMQ检索消息。。。或者,如果您在返回时没有立即处理所有的消费类机器,您可能会这样做。

如果您想要同步地检索消息(在本例中是在同步方法调用中),则需要等待消息变为可用。为此,您需要使用“pull API”,即通道。基本集(…)

 类似资料:
  • 我是一个新的学习者,试图理解拉雷维尔的拉比MQ。我已找到驱动程序vyuldashev/laravel队列rabbitmq 我已经配置应用程序/queue.php,并运行驱动程序与此语法"php工匠队列:工作Rabbitmq"。控制器。我不会在我的控制器中调度作业,因为laravel只是监听消息并处理消息。谁能帮我解释一下这是怎么回事?谢啦

  • 在RabbitMQ级别上是否有任何机制允许我使用下一个消息直到上一个消息被加密为止?还是必须在服务器之间开发某种锁定机制?

  • 我有以下场景:有3个rabbitmq队列,生产者根据消息的优先级将消息推送到这些队列。(myqueue_high,myqueue_medium,myqueue_low)我希望有一个可以按顺序或优先级从这些队列中提取的单一使用者,即只要消息在那里,它就一直从高队列中提取。它是从介质中拉出来的。如果medium也是空的,它从Low拉出。 我如何实现这一点?我需要编写自定义组件吗?

  • 我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者

  • 我使用RabbitMQ作为不同消息的队列。当我使用来自一个队列的两个不同消费者的消息时,我会处理它们并将处理结果插入数据库: 我想大量使用队列中的消息,这将减少数据库负载。由于RabbitMQ不支持消费者批量读取消息,我将这样做smth: 消息在全部完全处理之前处于队列中 如果消费者跌倒或断开连接 - 消息保持安全 你认为这个解决方案怎么样?如果可以的话,如果消费者摔倒了,我怎样才能重新得到所有未

  • 我编写了一个C(Rabbitmq-c)工作应用程序,它使用由Python脚本(pika)发布的队列。 我有以下奇怪的行为,我似乎无法解决: 在消息发布到队列之前启动所有工作人员按预期工作 队列发布后启动1个工作人员按预期工作 然而:在一个工作人员开始从队列中消费后启动其他工作人员意味着这些工作人员在队列中看不到任何消息(消息计数=0),因此只是等待(即使队列中还有许多消息)。杀死第一个工作人员会突