如何从C#将任务排队到Celery?

Joe*_*Joe 8 c# python rabbitmq task-queue celery

据我所知,像RabbitMQ这样的消息代理可以帮助用不同语言/平台编写的不同应用程序相互通信.因为芹菜可以使用RabbitMQ作为消息代理,我相信我们可以将任务从任何应用程序排队到Celery,即使生产者不是用Python编写的.

现在我试图找出如何通过RabbitMQ从C#编写的应用程序将任务排队到Celery.但我还没有找到任何这样的例子.

我发现的唯一信息就是这个问题

接受的答案建议使用Celery消息格式协议将消息从Java排队到RabbitMQ.但是,答案中给出的链接没有任何示例,只有消息格式.

此外,消息格式表示在此协议中需要任务ID(UUID)进行通信.我的C#应用​​程序应该如何知道芹菜任务的任务ID?据我所知,它只能知道任务名称,而不是任务ID.

小智 6

我不知道这个问题是否仍然有意义,但希望答案会对其他人有所帮助。

这就是我成功完成了向Celery示例工作者执行任务的方式。

  1. 您需要按照此处所述生产者(客户端)与RabbitMQ之间建立连接。

        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = username;
        factory.Password = password;
        factory.VirtualHost = virtualhost;
        factory.HostName = hostname;
        factory.Port = port;
    
        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();
    
    Run Code Online (Sandbox Code Playgroud)

    在默认的RabbitMQ配置中,只有来宾用户只能用于本地连接(从127.0.0.1开始)。问题的答案说明了如何在RabbitMQ中定义用户。

  2. 接下来-创建一个回调以获取结果。本示例使用Direct Reply-to,因此答案侦听器将如下所示:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var ansBody = ea.Body;
            var ansMessage = Encoding.UTF8.GetString(ansBody);
            Console.WriteLine(" [x] Received {0}", ansMessage);
            Console.WriteLine(" [x] Done");
        };
        channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
    
    Run Code Online (Sandbox Code Playgroud)
  3. 创建Celery将消耗的任务消息:

        IDictionary<string, object> headers = new Dictionary<string, object>();
        headers.Add("task", "tasks.add");
        Guid id = Guid.NewGuid();
        headers.Add("id", id.ToString());
    
        IBasicProperties props = channel.CreateBasicProperties();
        props.Headers = headers;
        props.CorrelationId = (string)headers["id"];
        props.ContentEncoding = "utf-8";
        props.ContentType = "application/json";
        props.ReplyTo = "amq.rabbitmq.reply-to";
    
        object[] taskArgs = new object[] { 1, 200 };
    
        object[] arguments = new object[] { taskArgs, new object(), new object()};
    
        MemoryStream stream = new MemoryStream();
        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[]));
        ser.WriteObject(stream, arguments);
        stream.Position = 0;
        StreamReader sr = new StreamReader(stream);
        string message = sr.ReadToEnd();
    
        var body = Encoding.UTF8.GetBytes(message);
    
    Run Code Online (Sandbox Code Playgroud)
  4. 最后,将消息发布到RabbitMQ:

            channel.BasicPublish(exchange: "",
                             routingKey: "celery",
                             basicProperties: props,
                             body: body);
    
    Run Code Online (Sandbox Code Playgroud)


Gia*_*nis 6

芹菜随花而来。Flower 提供 REST API 来管理任务。https://flower.readthedocs.io/en/latest/api.html#post--api-task-async-apply-(.+) 在大多数情况下,这比手动创建任务和使用起来更加简单和强大将它们插入到 MQ 上。


小智 3

根据这篇文章,celery .Net客户端使用.Net Framework附带的默认TaskScheduler。这知道如何为您的任务生成 ID。本文还在这里举了一些例子。