我有一个用例,其中数据将从kafkaTopic1流入程序(我们称之为P1),经过处理,然后持久化到数据库。P1将在一个多节点集群上,因此每个节点将处理大量的kafka分区(假设本主题有5个节点和50个kafka分区)。如果其中一个节点由于任何原因完全失败,并且有数据正在处理,那么该数据将丢失。
例如,如果kafkaTopic1上有500条消息,node2拉出了10条消息(因此根据偏移量要拉出的下一条消息是Message11),但只有其中8条被完全处理并在节点失败时持久保存到数据库,则仍在处理的2条将丢失。当节点被恢复时,它将开始从消息11读取,跳过丢失的两条消息(从技术上讲,kafka分区将开始将其消息发送到另一个要处理的节点,因此该分区的偏移量将移动,我们不一定知道节点死亡时下一个要处理的消息是什么)。
(注意:当节点死亡时,假设用户注意到并完全关闭P1,因此此时暂时不会处理更多数据)。
如果在flink作业中启用了检查点,那么您不应该丢失消息,因为flink也在内部维护偏移量,并且从失败中恢复后,它应该读取flink最后提交的偏移量。
现在,如果您仍然希望找到偏移量并重新开始从偏移量中读取,这将变得很棘手,因为您需要根据给定的使用者组为给定的主题找到所有分区的偏移量。
我不知道如何从开箱即用的Flink-kafka-Consumer API中做到这一点,但您可以将kafka依赖项添加到项目中,并从kafka API创建一个kafkaconsumer。一旦有了消费者,就可以调用
consumer.position(partition)
consumer.committed(partition)
请注意,您仍然需要遍历所有分区以获得所有当前偏移量
在这里阅读差异:Kafka Javadoc
一旦有了要从中读取的偏移量,就可以在flink作业中手动指定使用者偏移量,方法如下:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
然而,当在我的环境中测试此示例时,我得到了一个异常。