我们计划有多个Kafka消费者(Java),它们具有相同的组ID..所以如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量..例如,生产者每秒发布40条消息...消费者每秒处理1条消息...虽然我们可以有多个消费者,但不能有40条RT???如果我错了就纠正我...
在我们的情况下,使用者必须提交偏移量,只有在消息处理成功后...否则消息将被重新处理...有没有更好的解决方法???
根据你的问题澄清。
一个Kafka消费者可以一次读取多条消息。但Kafka使用者并不真正读取消息,更正确的说法是,使用者读取一定数量的字节,然后根据单个消息的大小,决定要读取多少消息。通过阅读Kafka使用者配置,您不允许指定要获取多少消息,您指定使用者可以获取的最大/最小数据大小。无论有多少消息适合在该范围内,您将得到多少消息。您将始终按照您所指出的顺序获得消息。
相关使用者配置(适用于0.9.0.0及更高版本)
在一个Kafka日志中,如果有10条消息(每条2字节)具有以下偏移量,[0,1,2,3,4,5,6,7,8,9]。
如果您读取10个字节,您将得到一个包含偏移[0,1,2,3,4]处的消息的批处理。
如果读取6个字节,则会得到一个包含偏移[0,1,2]处的消息的批处理。
如果您读取6个字节,然后再读取6个字节,您将获得包含消息[0,1,2]和[3,4,5]的两个批处理。
如果您读取8个字节,然后是4个字节,您将获得包含消息[0,1,2,3]和[4,5]的两个批处理。
更新:澄清提交
我不是百分之百确定promise是如何起作用的,我主要是在Storm环境中与Kafka合作的。提供的KafkaSpout自动提交Kafka消息。
使用commitSync()提交上次轮询返回的所有内容,在本例中,它将提交偏移量[0,1,2]。
另一方面,如果选择使用commitSync(java.util.Map offsets),则可以手动指定要提交哪些offsets。如果按顺序处理它们,可以处理偏移量0然后提交,处理偏移量1然后提交,最后处理偏移量2然后提交。
总而言之,Kafka给了你处理信息的自由,你可以选择按顺序处理,也可以完全随意地处理。
高级使用者 API 似乎一次读取一条消息。 如果消费者想要处理这些消息并提交给其他下游消费者(如Solr或Elastic-Search ),这可能会给他们带来很大的问题,因为他们更喜欢批量接收消息,而不是一次接收一条。 在内存中批处理这些消息也并非易事,因为只有当批处理已经提交时,Kafka中的偏移量也需要同步,否则具有未提交下游消息的崩溃的 kafka 使用者(如在Solr或ES中)将已经更新其
问题内容: 我们已经编写了一个Java客户端,用于将消息发布到kafka。代码如下所示 当我们执行此代码时,我们得到以下消息和异常 这发生在无限循环中,并且应用程序挂起…当我们检查kafka代理时,发现该主题已创建…但是我们没有收到消息…我们已经坚持了一段时间。 .. 请帮忙 问题答案: 我们终于解决了这个问题…我们在混合环境中运行kafka,如下文所述- https://medium.com/@
问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问
有一个16个分区的Kafka主题 使用给定的消费者组名称,我们目前正在启动单个消费者来阅读该主题。 > 单个消费者是否从该主题的(仅)读取?如果带有消息为空,消费者是否从下一个分区开始读取(...等等)? 我们可以选择启动多个消费者(使用相同的消费者组名称)来读取相同的主题(有16个分区)。为了并行读取多个分区,可以维护多少消费者?
我有事务性的和正常的生产者在应用程序,是写到主题Kafka-主题如下。 事务性Kafka生产者的配置 普通生产者配置相同,只有ProducerConfig.client_id_config和ProducerConfig.Transactional_id_config未添加。 使用者配置如下 因为我将isolation.level设置为read_committed,所以它应该只使用来自订阅主题的事务
如有任何帮助,我们将不胜感激。