当前位置: 首页 > 知识库问答 >
问题:

如何从一开始就使用Kafka Consumer API读取数据?

顾斌
2023-03-14

请告诉我,每次我运行消费者时,如何从一开始就使用Kafka消费者API阅读消息

共有3个答案

滕弘新
2023-03-14
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

如果您只是避免保存任何补偿,则消费者将始终在开始时重置。

谯嘉懿
2023-03-14

这样做的一个选择是每次开始时都有一个唯一的组标识,这意味着Kafka会从一开始就向您发送主题中的消息。当您为Kafka消费者设置属性时,请执行以下操作:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

另一个选项是使用consumer.seekToStart(consumer.assignment()),但这将不起作用,除非Kafka首先通过让消费者调用轮询方法从消费者那里获得心跳。因此,如果您希望从一开始就获得所有记录,请调用轮询(),然后执行asikTo开始(),然后再次调用轮询()。这是一个小hackey,但这似乎是最可靠的方式做到这一点,因为0.9版本。

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
况弘新
2023-03-14

这适用于0.9. x消费者。基本上,当您创建消费者时,您需要使用属性消费者配置为该消费者分配消费者组id。GROUP_ID_CONFIG。每次启动消费者执行类似这样的操作时,随机生成消费者组idproperties.putGROUP_ID_CONFIG,UUID.随机UUID(). toString());(属性是java.util.属性的实例,您将传递给构造函数new Kafka消费者(属性))。

随机生成客户端意味着新的消费者组在kafka中没有任何关联的偏移量。所以在这之后我们要做的是为这个场景设置一个策略。作为auto的文档。抵消重置属性显示:

如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:

  • 最早:自动重置偏移为最早偏移
  • 最新:自动重置偏移为最新偏移
  • 无:如果没有找到以前的偏移量或消费者的组,则向消费者抛出异常
  • 其他:向消费者抛出异常。

因此,从上面列出的选项中,我们需要选择最早的策略,以便新的消费群体每次都从头开始。

您的java代码如下所示:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);

你现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组但被分布时,如何生成一个随机标识并在这些实例之间分布,以便它们都属于同一个消费者组。

希望有帮助!

 类似资料:
  • 我正在尝试构建一个基本的服务人员来处理从我的网站发送的所有AJAX请求。 服务工作者已订阅获取事件。我可以处理所有发送的请求,但在第一次安装service worker时,该请求不起作用。 一旦安装了服务工作者,如果我刷新网页,那么获取事件处理程序就会工作。 我希望我的服务人员从一开始就获取所有请求。有可能吗? 这是我的代码: 指数html sw.js

  • 我想用html制作datalist这里是一个例子 但是当我搜索“go”时,它会显示两个结果 null 我必须在MySQL中存储的5000多条记录上实现这一点。

  • 问题内容: 我需要知道如何从python文件中读取行,以便首先读取最后一行并以这种方式继续进行操作,直到光标到达文件的开头为止。有任何想法吗? 问题答案: 通过至少三种方法可以解决此问题的一般方法,即按行反向反向读取文本文件。 普遍的问题是,由于每行的长度可以不同,因此您无法事先知道文件中每行的起始位置,也不知道其中有多少行。这意味着您需要对问题应用一些逻辑。 通用方法#1:将整个文件读入内存 使

  • 问题内容: 我正在尝试将数据从一页传递到另一页。 www.mints.com?name=某物 如何使用JavaScript 阅读? 问题答案: 下面的a代码可以工作,并且在不可用的情况下仍然有用,但是它是在JavaScript中没有本机解决方案的时候编写的。在现代浏览器或Node.js中,更喜欢使用内置功能。 用法如下: 它返回一个像这样的对象: 所以 给

  • 介绍 有时候,浏览器会通过post发送很多数据。在webpy,你可以这样操作。 代码 class RequestHandler(object): def POST(self): data = web.data() # 通过这个方法可以取到数据

  • 因此,我有一个扫描仪,它可以使用while(file.hasNext())读取多行的文本文件,但是在它到达文本文件的末尾之后,我该如何制作它,以便在单独的while循环中重新开始读取行?