据我所知,RabbitMQ等消息代理促进了用不同语言/平台编写的不同应用程序之间的通信。因此,由于celery可以使用RabbitMQ作为消息代理,我相信我们可以将任务从任何应用程序排队到celery,即使生产者不是用Python编写的。
现在,我想知道如何通过RabbitMQ将用C编写的应用程序中的任务排队到芹菜。但我还没有找到这样的例子。
我发现的唯一与此相关的信息就是这个问题
其中,公认的答案建议使用Celery消息格式协议将消息从Java排队到RabbitMQ。然而,答案中给出的链接没有任何示例,只有消息格式。
此外,消息格式显示在该协议中通信需要任务ID(UUID)。我的C#应用程序应该如何知道芹菜任务的任务ID?据我所知,它只能知道任务名称,但不能知道任务ID。
根据这篇文章,芹菜。Net client使用随附的默认TaskScheduler。Net框架。这知道如何为任务生成ID。本文还列举了一些例子。
芹菜配花。Flower提供了一个REST API来管理任务。https://flower.readthedocs.io/en/latest/api.html#post--api任务异步应用-(.)在大多数情况下,与手动创建任务并将其插入MQ相比,这将更加简单和可靠。
我不知道这个问题是否仍然相关,但希望答案能帮助其他人。
下面是我如何成功地将一个任务queening给芹菜示例工人的。
>
您需要在生产者(客户端)和RabbitMQ之间建立连接,如本文所述。
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = username;
factory.Password = password;
factory.VirtualHost = virtualhost;
factory.HostName = hostname;
factory.Port = port;
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
在默认RabbitMQ配置中,只有来宾用户只能用于本地连接(来自127.0.0.1)。这个问题的答案解释了如何在RabbitMQ中定义用户。
接下来-创建回调以获取结果。本例使用的是直接回复,因此应答侦听器的外观如下所示:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var ansBody = ea.Body;
var ansMessage = Encoding.UTF8.GetString(ansBody);
Console.WriteLine(" [x] Received {0}", ansMessage);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
创建芹菜将消费的任务消息:
IDictionary<string, object> headers = new Dictionary<string, object>();
headers.Add("task", "tasks.add");
Guid id = Guid.NewGuid();
headers.Add("id", id.ToString());
IBasicProperties props = channel.CreateBasicProperties();
props.Headers = headers;
props.CorrelationId = (string)headers["id"];
props.ContentEncoding = "utf-8";
props.ContentType = "application/json";
props.ReplyTo = "amq.rabbitmq.reply-to";
object[] taskArgs = new object[] { 1, 200 };
object[] arguments = new object[] { taskArgs, new object(), new object()};
MemoryStream stream = new MemoryStream();
DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[]));
ser.WriteObject(stream, arguments);
stream.Position = 0;
StreamReader sr = new StreamReader(stream);
string message = sr.ReadToEnd();
var body = Encoding.UTF8.GetBytes(message);
最后,将消息发布到RabbitMQ:
channel.BasicPublish(exchange: "",
routingKey: "celery",
basicProperties: props,
body: body);
问题内容: 我已经使用JavaFX制作了GUI,并且有三个单选按钮,一旦用户单击“提交”并创建了另一个线程,并且根据检查的单选按钮,该线程将运行所需的输出并将结果输出到控制台。 但是,当线程正在运行时(一个过程大约需要30秒才能完成),我可以检查任何单选按钮。为此,它创建另一个线程,并与其他正在进行的线程长时间输出。所以我的输出盒简直是一团糟!我正在查看 异步 任务,但不确定是否与此相关。 这是我
我使用JavaFX制作了一个GUI,有三个单选按钮,一旦用户单击提交并创建了另一个线程,并且根据检查了什么单选按钮,线程运行所需的输出并将结果输出到控制台。 但是当线程运行时(一个进程需要大约30秒才能完成),我可以检查任何放射性按钮。它创建另一个线程并与另一个正在进行的线程一起输出长时间。所以我的输出框只是一个乱七八糟的东西!我在看异步任务,但我不确定这是否与它有关。 以下是我需要的:如果一个任
我有一个功能应用程序,当blob被上传或更新时触发。但在我的要求中,我的容器可能一次接收数千个blob(客户将批量将blob上传到容器)。在这种情况下,正如我所知,函数app将为容器中的每个blob并行运行。但我需要在队列中处理它们。因此,我想在这里使用队列存储。每当上传blob时,让我通过blob触发功能应用程序将该blob名称作为队列消息添加到队列存储中,然后我将拥有一个队列触发功能应用,它将
我目前正在开发一个Twilio排队系统,但我被困在如何让电话排队并连接到代理上。 我把所有的电话都排在队列中,我的理解是,我们需要呼叫出队列,下面是twilio网站上可用的示例响应。 但是,当有人打电话给twilio时,这可以作为对twilio的回应发送。 那么,特工应该拨打twilio号码吗?如果是这样的话,我们是否需要为这个案例保留一个内部号码,除了我们为来电提供的支持号码? 谢谢你,巴斯卡。
问题内容: 我对ThreadPoolExecutor有一个非常简单的问题。我有以下情况:我必须使用队列中的对象,为它们创建适当的工作程序任务,然后将其提交给ThreadPoolExecutor。这很简单。但是在关闭情况下, 许多 工作人员可能会排队等待执行。由于这些任务之一可能正在运行一个小时,而且我希望相对快速地正常关闭应用程序,因此我想从ThreadPoolExecutor中丢弃所有排队的任务
我找不到任何符合我要求的遗嘱执行人。我想要一个具有corePoolSize、maximumPoolSize和BlockingQueue的ExecutorService; 当执行函数被调用时,像往常一样,使用核心线程,如果核心线程正在使用,则将任务放入队列,如果队列已满,则创建新线程,直到达到最大池大小。这是ThreaPoolExecator的标准行为。线程池执行器 在这部分之前一切都好。我可以使用