当前位置: 首页 > 知识库问答 >
问题:

对Kafka感到困惑

暨承平
2023-03-14

所以我一直在读Kafka的语义学,我对它的工作原理有点困惑。

我理解生产者如何避免发送重复的消息(以防代理的ack失败),但我不明白的是,在消费者处理消息但在提交偏移量之前崩溃的情况下,一次是如何工作的。Kafka不会在这种情况下重试吗?

共有2个答案

乔丁雨
2023-03-14

Radal在回答中很好地解释了这一点,关于在一个孤立的Kafka集群中只有一次。

在处理外部数据库(至少是事务性的)时,一个简单的方法是使用业务价值和它来自的分区/偏移量更新一行(在sgbd事务中)。这样,如果您的消费者在提交Kafka之前崩溃,您将能够获得它已处理的最后一个Kafka偏移量(通过使用consumer.seek())

在sgbd中,这可能是一个相当大的数据开销(为所有行保留偏移量/分区),但您可能能够进行一些优化。

扬尼克

陶温书
2023-03-14

以下是我认为你的意思:

  1. 消费者X看到记录Y,并在其上“行动”,但不提交其偏移量
  2. 消费者X崩溃(仍然没有提交补偿)
  3. 消费者X启动备份,重新分配相同的分区(不保证),最终再次看到记录Y

这是完全可能的。然而,要让Kafka只“工作”一次,你的所有副作用(状态、输出)也必须进入同一个Kafka集群。接下来会发生什么:

  1. 消费者X开始交易

如果你在同一个Kafka集群之外有副作用(比如说,在mysql中插入一行而不是record Z),那么没有通用的方法可以让Kafka一次对你有效。你需要依靠老式的重复数据消除和幂等。

 类似资料:
  • 我正在尝试提出一种解决方案,它涉及在连接操作之后应用一些逻辑,从多个中的中选择一个事件。这类似于reduce函数,但它只返回1个元素,而不是递增地返回。因此最终结果将是单个(,对,而不是一个 每个键保证只到达一次。 假设像上面这样的连接操作,它用4个生成了1个,成功地连接并收集在。现在,我想做的是,立即访问这些值,并执行一些逻辑以将正确匹配到一个。例如,对于上面的数据集,我需要(,和)。 将为每个

  • 问题内容: 我已经在eclipse中创建了一个项目,并添加了Maven依赖项。在Eclipse中,它表示我正在使用JRE 1.5。一切在Eclipse中都可以正常运行,例如,我可以运行测试。 当我尝试从终端运行时,出现以下错误。 …在-source 1.3中不支持泛型(使用-source 5或更高版本来启用泛型)… 看来,Maven认为我正在使用JRE 1.3,并且无法识别泛型或for-each循

  • 问题内容: 在碰到此链接http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using- executors-and-threadpoolexecutor 之后,这是我第一次为新项目使用Java线程池。 .html ,我对此更加困惑,这是页面中的代码, 在代码中,创建了一个固定大小的池并创建了10个工作线程,对吗? 线程池应该

  • 问题内容: 与此代码有点混淆。 我在pg-go 仓库中找到了这段代码,不知道为什么这样声明。请解释一下用这种方式声明变量的用例是什么。 问题答案: 这在运行时不会执行任何操作,但是除非类型满足接口要求,否则编译将失败。这是一种静态断言。

  • 问题内容: 我可以理解以下定义: 每个对象都有一个标识,一个类型和一个值。一旦创建了对象,其身份就永远不会改变。您可能会认为它是对象在内存中的地址。所述操作者比较两个对象的身份; 该函数返回一个表示其身份的整数。 我认为上面的定义在创建“某物”时起作用,例如: 但是我不理解: 我还没有创建任何东西。那么整数“ 1”如何具有ID?这是否意味着只要我在Python Shell中“提及” 1,便立即将其

  • 问题内容: 映射为: 运行以上查询将返回文档。我想了解松紧带在幕后做什么?通过查看默认分析器的输出,它不会标记癌症,使其返回“可以”,那么为什么返回带有“可以”一词的文档,又是什么原因导致该文档被返回呢?换句话说,搜索查询“癌症”正在发生什么其他处理。 更新 我是否可以在我的机器上运行一个命令,该命令将清除所有索引和所有内容,因此我的表盘整洁?我执行了删除/ *的操作,但成功了,但仍然匹配了。 问