当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka用户未按顺序接收消息

杨景山
2023-03-14
public class Kafta
{
    private Dictionary<string, object> config;
    private string topicName;

    public Kafta(string topic)
    {
        config = new Dictionary<string, object>
        {
            {"bootstrap.servers","192.168.60.173:9092" }
        };
        topicName = topic;
    }
    public async void SendMessageAsync(string message)
    {
        using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
        {
            var msg = await producer.ProduceAsync(topicName, "userid", message);

            //producer.ProduceAsync("console", null, message);
            producer.Flush(500);
        }
    }
}
static void Main(string[] args)
    {
        string topic = "tester2";
        long count = 1;
        Console.WriteLine("Starting to send message");
        Console.WriteLine("Write the message here: ");

        if(args.Length == 2)
        {
            topic = args[0];
            count = long.Parse(args[1]);
        }
        try
        {
            Console.WriteLine("Topic name " + topic);
            var message = Console.ReadLine();            
            var service = new Kafta(topic);

            for(var i = 0; i<count;i++)
            {
                var msg = message + " number " + i.ToString();
                Console.WriteLine("Message to Kafta: " + msg);
                service.SendMessageAsync(msg);
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured " + ex.Message);
        }
        finally
        {
            Console.WriteLine("Press any key to exit");
            Console.Read();
        }
    }
static void Main(string[] args)
    {
        var config = new Dictionary<string, object>
        {
          { "group.id", "sample-consumer" },
          { "bootstrap.servers", "192.168.60.173:9092" },
          { "enable.auto.commit", "false"}
        };
        string topic = "tester2";
        if (args.Length == 1)
            topic = args[0];
        using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
        {
            consumer.Subscribe(new string[] { topic });                
            consumer.OnMessage += (_, msg) =>
            {
                Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                consumer.CommitAsync(msg);

            };

            while (true)
            {
                consumer.Poll(100);
            }
        }
    }
Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9

消费者输出:

message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9

我对Kafka是新手,不知道我错过了什么来让它正确地工作。根据Kafka文档,消息的顺序是为我的用例保证的,所以一定有一些愚蠢的错误,我正在做,无法找出它。

除了Kafka,我还有别的选择吗?

谢谢

共有1个答案

江瀚昂
2023-03-14

根据Kafka文档,保证消息的顺序

只有每个分区。从您的问题来看,您没有提到您的主题有多少个分区。您正在打印主题:{msg.topic}分区:{msg.partition},但这不是您的文章的输出。

在生成器中,使用SendMessageAsync执行“触发并忘记”操作,而不验证代理是否实际收到了带有该方法返回值的消息。所以这是一种可能性--您的打印语句将是有序的,但消息不一定是以这种方式到达代理的。

 类似资料:
  • 我正在使用azure service bus主题,我已经为它订阅启用了会话。 在my logic应用程序中,我使用来自主题的sql事务插入数据,我使用主题订阅(peek-lock)并在订阅服务器级别将并发设置为默认,如下所示 根据我的理解,我的逻辑应用程序(订阅者)应该读取所有的消息,并且必须在FIFO中处理 我的逻辑应用程序像

  • 概述 为了能够让轻应用订阅号的开发者接收到用户在消息窗口的留言消息,开发者可以在管理后台设置消息服务器并开启接收用户对话消息模式。 设置消息服务器时接需要提供可用的接收消息的回调URL地址,为了让通信更加安全,建议使用https。 设置成功并开启了接收对话消息模式后,用户在轻应用或订阅号窗口里发送的消息会推送给设置的URL,服务器接收到消息后,可以通过异步发送消息接口给用户回复消息。 设置消息服务

  • 我从以下链接获得所有信息: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html 当我们运行consumer时,我们没有从consumer端得到任何通知。请给我一点主意。

  • 我一直在努力通过Smack和Openfire服务器与XMPP聊天。我的问题如下: 每当一个用户向另一个用户发送消息时,该消息就会在另一个用户处正确接收。但是任何回复都不会出现在第一条消息的发件人处。因此,用户1成功地发送给用户2。然后,用户2无法向用户1发送任何回复。另一方面,如果我重新启动并让用户再次登录,则用户2可以发送给用户1,但反之亦然。 我想说的是,只有聊天的发起者才能发送消息,接收者不

  • 我有一个调查多个主题的消费者。对于这个问题,我限制了每个主题一个分区。假设当消费者开始轮询时,每个主题都有一些数据。阅读的顺序是什么? 是循环赛吗?它是从第一个读到下一个吗?我使用进行轮询。

  • 我每个websocket接收几十条消息,这些消息可能只差几毫秒就能到达。我需要用操作来处理这些数据,这些操作有时会花费一些时间(例如,在DB中的插入)。为了处理接收到的新消息,必须完成对前一个消息的处理。 我的第一个想法是用Node.js Bull(用Redis)准备一个队列,但恐怕太长了,无法运行。这些消息的处理必须保持快速。 我尝试使用JS迭代器/生成器(直到现在我还从未使用过),我测试了如下