props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
如果您只是避免保存任何补偿,则消费者将始终在开始时重置。
这样做的一个选择是每次开始时都有一个唯一的组标识,这意味着Kafka会从一开始就向您发送主题中的消息。当您为Kafka消费者设置属性时,请执行以下操作:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
另一个选项是使用consumer.seekToStart(consumer.assignment())
,但这将不起作用,除非Kafka首先通过让消费者调用轮询方法从消费者那里获得心跳。因此,如果您希望从一开始就获得所有记录,请调用轮询()
,然后执行asikTo开始()
,然后再次调用轮询()
。这是一个小hackey,但这似乎是最可靠的方式做到这一点,因为0.9版本。
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
这适用于0.9. x消费者。基本上,当您创建消费者时,您需要使用属性消费者配置为该消费者分配消费者组id。GROUP_ID_CONFIG
。每次启动消费者执行类似这样的操作时,随机生成消费者组idproperties.putGROUP_ID_CONFIG,UUID.随机UUID(). toString());(属性是java.util.属性的实例,您将传递给构造函数new Kafka消费者(属性)
)。
随机生成客户端意味着新的消费者组在kafka中没有任何关联的偏移量。所以在这之后我们要做的是为这个场景设置一个策略。作为auto的文档。抵消重置
属性显示:
如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
因此,从上面列出的选项中,我们需要选择最早的
策略,以便新的消费群体每次都从头开始。
您的java代码如下所示:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
你现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组但被分布时,如何生成一个随机标识并在这些实例之间分布,以便它们都属于同一个消费者组。
希望有帮助!
我正在尝试构建一个基本的服务人员来处理从我的网站发送的所有AJAX请求。 服务工作者已订阅获取事件。我可以处理所有发送的请求,但在第一次安装service worker时,该请求不起作用。 一旦安装了服务工作者,如果我刷新网页,那么获取事件处理程序就会工作。 我希望我的服务人员从一开始就获取所有请求。有可能吗? 这是我的代码: 指数html sw.js
我想用html制作datalist这里是一个例子 但是当我搜索“go”时,它会显示两个结果 null 我必须在MySQL中存储的5000多条记录上实现这一点。
问题内容: 我需要知道如何从python文件中读取行,以便首先读取最后一行并以这种方式继续进行操作,直到光标到达文件的开头为止。有任何想法吗? 问题答案: 通过至少三种方法可以解决此问题的一般方法,即按行反向反向读取文本文件。 普遍的问题是,由于每行的长度可以不同,因此您无法事先知道文件中每行的起始位置,也不知道其中有多少行。这意味着您需要对问题应用一些逻辑。 通用方法#1:将整个文件读入内存 使
问题内容: 我正在尝试将数据从一页传递到另一页。 www.mints.com?name=某物 如何使用JavaScript 阅读? 问题答案: 下面的a代码可以工作,并且在不可用的情况下仍然有用,但是它是在JavaScript中没有本机解决方案的时候编写的。在现代浏览器或Node.js中,更喜欢使用内置功能。 用法如下: 它返回一个像这样的对象: 所以 给
介绍 有时候,浏览器会通过post发送很多数据。在webpy,你可以这样操作。 代码 class RequestHandler(object): def POST(self): data = web.data() # 通过这个方法可以取到数据
因此,我有一个扫描仪,它可以使用while(file.hasNext())读取多行的文本文件,但是在它到达文本文件的末尾之后,我该如何制作它,以便在单独的while循环中重新开始读取行?