我有一个用例,在这个用例中,我有3个Kafka消费者向一个主题写作,每个消费者中的消息都需要按顺序处理。在这种情况下,如果某个消费者中存在延迟,则需要更早处理的消息将被丢弃(写入条件)。那么,有没有一种方法可以维持这些消息的顺序呢。
消息总是在Kafka分区中排序。通常,属于某个键的所有消息都位于某个分区中(通过分区逻辑)。
我有一个用例,其中我有3个Kafka消费者编写一个主题
我想,你是说你有三个消费者从一个主题阅读
这里有两种情况:
如果#1
您可以有3个消费者,每个消费者都有不同的group.id
,这样每个消费者都会消费所有的消息集。在这里,较慢的消费者不会减慢其他消费者的速度。因为每个消费者通常都在自己的线程或进程中运行。
如果#2
您可以有3个消费者具有相同的组。id,这样每个使用者都将获得自己的分区份额。一个消费者消费的消息不会被另一个消费者消费。在这里,较慢的消费者也不会减慢其他消费者的速度。因为每个使用者将只使用自己的一组分区。
在这种情况下,如果其中一个消费者存在延迟,则需要更早处理的消息将被删除(写入条件)
Kafka中没有隐式删除,您必须在轮询消息后自己删除消息。
我认为,要检查滞后,可以从消费者开始。endOffsets()和消费者。position()
,差应该会给您带来滞后。根据延迟情况,您可以选择删除邮件。
consumer.assignment().forEach( topicPartition -> {
long currentPos = consumer.position(topicPartition);
long endOfPartition = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
});
我知道一个消费者组中活动消费者的最大数量是一个主题的分区数。 对于处理速度较慢的消费者,最佳做法是什么?如何实现更多的并行性? 例如:一个主题有6个分区,生产者每秒生成数千条消息。所以我在这个群体中最多有6个消费者。考虑到处理这些消息很复杂,而且消费者比生产者慢得多。结果是,消费者总是落后于最后一个补偿,而滞后正在增加。 在传统的MQ系统中,我们只需添加越来越多的使用者以保持最新。 如何使用Kaf
我只是在试用这里提到的kafka-storm喷口https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。 但是,上面的喷子从Kafka主题中获取消息的速度大约是每秒7000条,但我预计每秒大约有50000条消息。我尝试了在spoutConfig中增加提取缓冲区大小的各种选项,但没有看到任何结果。
我将ActiveMQ与Apache Camel一起使用。现在我遇到了这个问题,在ActiveMQ中有大量挂起的消息。消息处于挂起状态,出列过程非常缓慢。 我的理解正确吗?通常情况下,为了有那么多待处理的消息,每个消费者的调度队列的大小应该已经接近默认的预取限制(即1000)?但每个消费者只有20-80美元? 我对ActiveMq了解不多。那么我应该从哪里了解如何解决这个问题呢? 连接配置01是活动
我有一种在shell中执行此任务的方法:如何使kafka消费者从上次消耗的偏移量读取,而不是从开始读取 但是,我愿意在Python中这样做,使用 我找不到任何关于这种情况的api。 http://kafka-python.readthedocs.io/en/latest/apidoc/KafkaConsumer.html
问题内容: 有时,当我看到自己的日志记录代码时,我想知道自己是否做对了。可能没有确切的答案,但是我有以下担忧: 图书馆课 我有几个库类可能记录一些消息。致命错误被报告为例外。当前,我的类中有一个静态记录器实例,其类名称为记录名称。(Log4j的:) 这是正确的方法吗?也许该库类的用户不需要我的实现中的任何消息,或者想要将它们重定向到应用程序特定的日志。我是否应该允许用户从“外部世界”设置记录器?您
我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?