当前位置: 首页 > 知识库问答 >
问题:

Kafka-慢速消费情况下的最佳实践

阎成天
2023-03-14

我有一个用例,在这个用例中,我有3个Kafka消费者向一个主题写作,每个消费者中的消息都需要按顺序处理。在这种情况下,如果某个消费者中存在延迟,则需要更早处理的消息将被丢弃(写入条件)。那么,有没有一种方法可以维持这些消息的顺序呢。

共有1个答案

鲁博雅
2023-03-14

消息总是在Kafka分区中排序。通常,属于某个键的所有消息都位于某个分区中(通过分区逻辑)。

我有一个用例,其中我有3个Kafka消费者编写一个主题

我想,你是说你有三个消费者从一个主题阅读

这里有两种情况:

  1. 每个消费者都会获得主题中的所有消息
  2. 每个使用者仅获取主题中的一部分消息(分区)

如果#1

您可以有3个消费者,每个消费者都有不同的group.id,这样每个消费者都会消费所有的消息集。在这里,较慢的消费者不会减慢其他消费者的速度。因为每个消费者通常都在自己的线程或进程中运行。

如果#2

您可以有3个消费者具有相同的组。id,这样每个使用者都将获得自己的分区份额。一个消费者消费的消息不会被另一个消费者消费。在这里,较慢的消费者也不会减慢其他消费者的速度。因为每个使用者将只使用自己的一组分区。

在这种情况下,如果其中一个消费者存在延迟,则需要更早处理的消息将被删除(写入条件)

Kafka中没有隐式删除,您必须在轮询消息后自己删除消息。

我认为,要检查滞后,可以从消费者开始。endOffsets()和消费者。position(),差应该会给您带来滞后。根据延迟情况,您可以选择删除邮件。

consumer.assignment().forEach( topicPartition -> {
      long currentPos = consumer.position(topicPartition);
      long endOfPartition = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
});
 类似资料:
  • 我只是在试用这里提到的kafka-storm喷口https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。 但是,上面的喷子从Kafka主题中获取消息的速度大约是每秒7000条,但我预计每秒大约有50000条消息。我尝试了在spoutConfig中增加提取缓冲区大小的各种选项,但没有看到任何结果。

  • 我知道一个消费者组中活动消费者的最大数量是一个主题的分区数。 对于处理速度较慢的消费者,最佳做法是什么?如何实现更多的并行性? 例如:一个主题有6个分区,生产者每秒生成数千条消息。所以我在这个群体中最多有6个消费者。考虑到处理这些消息很复杂,而且消费者比生产者慢得多。结果是,消费者总是落后于最后一个补偿,而滞后正在增加。 在传统的MQ系统中,我们只需添加越来越多的使用者以保持最新。 如何使用Kaf

  • 我将ActiveMQ与Apache Camel一起使用。现在我遇到了这个问题,在ActiveMQ中有大量挂起的消息。消息处于挂起状态,出列过程非常缓慢。 我的理解正确吗?通常情况下,为了有那么多待处理的消息,每个消费者的调度队列的大小应该已经接近默认的预取限制(即1000)?但每个消费者只有20-80美元? 我对ActiveMq了解不多。那么我应该从哪里了解如何解决这个问题呢? 连接配置01是活动

  • 我有一种在shell中执行此任务的方法:如何使kafka消费者从上次消耗的偏移量读取,而不是从开始读取 但是,我愿意在Python中这样做,使用 我找不到任何关于这种情况的api。 http://kafka-python.readthedocs.io/en/latest/apidoc/KafkaConsumer.html

  • 问题内容: 有时,当我看到自己的日志记录代码时,我想知道自己是否做对了。可能没有确切的答案,但是我有以下担忧: 图书馆课 我有几个库类可能记录一些消息。致命错误被报告为例外。当前,我的类中有一个静态记录器实例,其类名称为记录名称。(Log4j的:) 这是正确的方法吗?也许该库类的用户不需要我的实现中的任何消息,或者想要将它们重定向到应用程序特定的日志。我是否应该允许用户从“外部世界”设置记录器?您

  • 我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?