当前位置: 首页 > 知识库问答 >
问题:

如何将任务从C#排队到Celery?

弘靖琪
2023-03-14

据我所知,RabbitMQ等消息代理促进了用不同语言/平台编写的不同应用程序之间的通信。因此,由于celery可以使用RabbitMQ作为消息代理,我相信我们可以将任务从任何应用程序排队到celery,即使生产者不是用Python编写的。

现在,我想知道如何通过RabbitMQ将用C编写的应用程序中的任务排队到芹菜。但我还没有找到这样的例子。

我发现的唯一与此相关的信息就是这个问题

其中,公认的答案建议使用Celery消息格式协议将消息从Java排队到RabbitMQ。然而,答案中给出的链接没有任何示例,只有消息格式。

此外,消息格式显示在该协议中通信需要任务ID(UUID)。我的C#应用程序应该如何知道芹菜任务的任务ID?据我所知,它只能知道任务名称,但不能知道任务ID。

共有3个答案

郁和通
2023-03-14

根据这篇文章,芹菜。Net client使用随附的默认TaskScheduler。Net框架。这知道如何为任务生成ID。本文还列举了一些例子。

伊锦
2023-03-14

芹菜配花。Flower提供了一个REST API来管理任务。https://flower.readthedocs.io/en/latest/api.html#post--api任务异步应用-(.)在大多数情况下,与手动创建任务并将其插入MQ相比,这将更加简单和可靠。

章学义
2023-03-14

我不知道这个问题是否仍然相关,但希望答案能帮助其他人。

下面是我如何成功地将一个任务queening给芹菜示例工人的。

>

  • 您需要在生产者(客户端)和RabbitMQ之间建立连接,如本文所述。

        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = username;
        factory.Password = password;
        factory.VirtualHost = virtualhost;
        factory.HostName = hostname;
        factory.Port = port;
    
        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();
    

    在默认RabbitMQ配置中,只有来宾用户只能用于本地连接(来自127.0.0.1)。这个问题的答案解释了如何在RabbitMQ中定义用户。

    接下来-创建回调以获取结果。本例使用的是直接回复,因此应答侦听器的外观如下所示:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var ansBody = ea.Body;
            var ansMessage = Encoding.UTF8.GetString(ansBody);
            Console.WriteLine(" [x] Received {0}", ansMessage);
            Console.WriteLine(" [x] Done");
        };
        channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
    

    创建芹菜将消费的任务消息:

        IDictionary<string, object> headers = new Dictionary<string, object>();
        headers.Add("task", "tasks.add");
        Guid id = Guid.NewGuid();
        headers.Add("id", id.ToString());
    
        IBasicProperties props = channel.CreateBasicProperties();
        props.Headers = headers;
        props.CorrelationId = (string)headers["id"];
        props.ContentEncoding = "utf-8";
        props.ContentType = "application/json";
        props.ReplyTo = "amq.rabbitmq.reply-to";
    
        object[] taskArgs = new object[] { 1, 200 };
    
        object[] arguments = new object[] { taskArgs, new object(), new object()};
    
        MemoryStream stream = new MemoryStream();
        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[]));
        ser.WriteObject(stream, arguments);
        stream.Position = 0;
        StreamReader sr = new StreamReader(stream);
        string message = sr.ReadToEnd();
    
        var body = Encoding.UTF8.GetBytes(message);
    

    最后,将消息发布到RabbitMQ:

            channel.BasicPublish(exchange: "",
                             routingKey: "celery",
                             basicProperties: props,
                             body: body);
    

  •  类似资料:
    • 问题内容: 我已经使用JavaFX制作了GUI,并且有三个单选按钮,一旦用户单击“提交”并创建了另一个线程,并且根据检查的单选按钮,该线程将运行所需的输出并将结果输出到控制台。 但是,当线程正在运行时(一个过程大约需要30秒才能完成),我可以检查任何单选按钮。为此,它创建另一个线程,并与其他正在进行的线程长时间输出。所以我的输出盒简直是一团糟!我正在查看 异步 任务,但不确定是否与此相关。 这是我

    • 我使用JavaFX制作了一个GUI,有三个单选按钮,一旦用户单击提交并创建了另一个线程,并且根据检查了什么单选按钮,线程运行所需的输出并将结果输出到控制台。 但是当线程运行时(一个进程需要大约30秒才能完成),我可以检查任何放射性按钮。它创建另一个线程并与另一个正在进行的线程一起输出长时间。所以我的输出框只是一个乱七八糟的东西!我在看异步任务,但我不确定这是否与它有关。 以下是我需要的:如果一个任

    • 我有一个功能应用程序,当blob被上传或更新时触发。但在我的要求中,我的容器可能一次接收数千个blob(客户将批量将blob上传到容器)。在这种情况下,正如我所知,函数app将为容器中的每个blob并行运行。但我需要在队列中处理它们。因此,我想在这里使用队列存储。每当上传blob时,让我通过blob触发功能应用程序将该blob名称作为队列消息添加到队列存储中,然后我将拥有一个队列触发功能应用,它将

    • 我目前正在开发一个Twilio排队系统,但我被困在如何让电话排队并连接到代理上。 我把所有的电话都排在队列中,我的理解是,我们需要呼叫出队列,下面是twilio网站上可用的示例响应。 但是,当有人打电话给twilio时,这可以作为对twilio的回应发送。 那么,特工应该拨打twilio号码吗?如果是这样的话,我们是否需要为这个案例保留一个内部号码,除了我们为来电提供的支持号码? 谢谢你,巴斯卡。

    • 问题内容: 我对ThreadPoolExecutor有一个非常简单的问题。我有以下情况:我必须使用队列中的对象,为它们创建适当的工作程序任务,然后将其提交给ThreadPoolExecutor。这很简单。但是在关闭情况下, 许多 工作人员可能会排队等待执行。由于这些任务之一可能正在运行一个小时,而且我希望相对快速地正常关闭应用程序,因此我想从ThreadPoolExecutor中丢弃所有排队的任务

    • 我找不到任何符合我要求的遗嘱执行人。我想要一个具有corePoolSize、maximumPoolSize和BlockingQueue的ExecutorService; 当执行函数被调用时,像往常一样,使用核心线程,如果核心线程正在使用,则将任务放入队列,如果队列已满,则创建新线程,直到达到最大池大小。这是ThreaPoolExecator的标准行为。线程池执行器 在这部分之前一切都好。我可以使用