当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka消费者组和简单消费者

黄成荫
2023-03-14

我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现
1)高级消费者/消费者群体
2)简单消费者

高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我唯一的选择。

共有2个答案

秦皓君
2023-03-14

在Apache Kafka 0.9和0.10中,消费者组管理完全由Broker(用于协调)和主题(用于状态存储)在Kafka应用程序中处理。

当使用者组首次订阅主题时,< code>auto.offset.reset的设置决定了使用者从何处开始使用消息(http://Kafka . Apache . org/documentation . html # newconsumerconfigs)

您可以注册消费者RebalanceListener以在为特定消费者分配主题/分区时接收通知。

使用者运行后,可以使用“搜索到开始”和“查找到结束”从特定偏移量获取消息。seek会影响该使用者的下一次轮询,并存储在下一次提交中(例如,提交同步提交异步或当 auto.commit.interval 经过时(如果启用)。

消费者javadocs提到了更具体的情况:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

您可以将Kafka提供的组管理与通过seek(..)一旦分配了分区。

傅旺
2023-03-14

在大多数情况下,高级消费者API不允许您直接控制偏移量。

首次创建消费者组时,您可以使用auto.offset.reset属性告诉它是从kafka存储的最旧消息还是最新消息开始。

您还可以通过设置<code>auto.commit来控制高级消费者何时向zookeeper提交新的偏移量。将启用为false。

由于高级消费者将偏移量存储在 zookeeper 中,因此你的应用可以直接访问 zookeeper 并操纵偏移量 - 但它不在高级使用者 API 之外。

你的问题有点令人困惑,但是你可以在多线程环境中使用简单的消费者。这就是高级消费者所做的。

 类似资料:
  • 本文向大家介绍消费者和消费者组有什么关系?相关面试题,主要包含被问及消费者和消费者组有什么关系?时的应答技巧和注意事项,需要的朋友参考一下 每个消费者从属于消费组。具体关系如下:

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 本文向大家介绍简述消费者与消费组之间的关系相关面试题,主要包含被问及简述消费者与消费组之间的关系时的应答技巧和注意事项,需要的朋友参考一下 消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。

  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(