当前位置: 首页 > 知识库问答 >
问题:

SpringKafka不尊重马克斯·波尔。行为古怪的唱片

郜昊苍
2023-03-14

我正在尝试以下场景:

  1. 正在申请中。属性设置为max.poll。记录是50
  2. 正在申请中。属性设置enable auto commit=false,并将确认模式设置为手动
  3. 在我的方法中添加了@KafkaListener,但不提交任何消息,只需阅读、记录,但不进行确认

事实上,在我的Kafka主题中,我有500条消息要消费,所以我期待以下行为:

  1. Spring Kafka民意测验()50条消息(偏移0到50)。
  2. 如我所说,我没有犯任何事,只是记录了50条消息。
  3. 在下一个Spring Kafka轮询()调用中,获取相同的50条消息(偏移量0到50),如步骤1。在我的理解中,Spring Kafka应该在这个循环中继续(步骤1-3)阅读总是相同的消息。

但实际情况如下:

  1. Spring Kafka民意测验()50条消息(偏移0到50)。
  2. 如我所说,我没有犯任何事,只是记录了50条消息。
  3. 在下一个Spring Kafka轮询()调用中,获取与步骤1不同的下一个50条消息(偏移50到100)。

Spring Kafka读取500条消息,分为50条消息块,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到500条消息。

所以,我怀疑:

  1. 如果我将max.poll.recors配置为50,如果我没有提交任何内容,Spring Kafka如何获得下一个50条记录?我理解的投票()方法应该返回相同的记录。
  2. Spring Kafka有一些缓存吗?如果是,这可能是一个问题,如果我在缓存中100万记录没有提交。

共有2个答案

申屠洛华
2023-03-14

消费者未提交补偿仅在以下情况下才会产生影响:

  • 您的消费者在阅读200条消息后崩溃,当您重新启动它时,它将从0重新启动

所以在一个完美的世界里,你根本不需要提交,它会消耗所有的消息,因为消费者首先要求1-50,然后是51-100。

但如果消费者崩溃了,没人知道消费者读到的补偿是多少。如果使用者提交了偏移量,当它重新启动时,它可以检查偏移量主题,查看崩溃的使用者离开的位置,然后从那里开始。

max.poll.records定义了一次获取多少条记录,但并没有定义要获取哪些记录。

田信然
2023-03-14

你的第一个问题:

如果我配置了max.poll。记录到50张,如果我什么都没做,SpringKafka怎么能获得接下来的50张唱片呢?我知道poll()方法应该返回相同的记录。

首先,为了确保您没有提交任何内容,必须确保您理解以下3个参数,我相信您已经理解了。

  • 消费者配置。ENABLE_AUTO_COMMIT_CONFIG,将其设置为false(这也是推荐的默认值)。如果设置为false,请注意auto.commit.interval.ms变得无关紧要。看看这个留档:

因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢Kafka消费者配置。ENABLE_AUTO_COMMIT_CONFIG是假的。从版本2.3开始,它无条件地将其设置为false,除非在消费者工厂中特别设置或容器的消费者属性重写。

>

工厂。getContainerProperties()。setSyncCommits(真/假) 设置是否呼叫消费者。当容器负责提交时,commitSync()commitSync()。默认为true。这负责与Kafka同步,如果设置为true,则该调用将被阻止,直到Kafka响应。

其次,没有消费者投票()将不会返回相同的记录。对于当前运行的消费者,它用一些内部索引跟踪内存中的偏移量,我们不必关心提交偏移量。请参阅@GaryRussell的解释。

简而言之,他解释道:

一旦轮询返回记录(以及未提交的偏移量),它们将不会再次返回,除非重新启动使用者或对使用者执行seek()操作,将偏移量重置为未处理的偏移量。

你的第二个问题:

《SpringKafka》里有什么藏品吗?如果是,如果缓存中有100万条记录而没有提交,这可能是个问题。

没有“缓存”,它只是关于偏移和提交,如上所述的解释。

现在要实现你想做的事情,你可以考虑在获取前50条记录后做两件事,即下一次投票():

  • 或者,以编程方式重新启动容器
  • 或调用consumer.seek(分区,偏移);


另外:
无论您选择什么配置,您都可以通过查看此输出的LAG列来查看结果:

kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name

 类似资料:
  • 我有一个很棒的管道脚本: 但在输出中,我看到错误“找不到这样的字段:field java.lang.String size”: 我做错了什么? 编辑:我没有“待定签名批准”

  • 我正在使用cmake Gui构建开罗。当我点击“配置”时,Cmake显示以下错误: 找不到PIXMAN,尝试在系统变量PIXMAN中设置PIXMAN根文件夹的路径(缺少:PIXMAN_LIBRARIESPIXMAN_INCLUDE_DIRS) 找不到ZLIB(缺少:ZLIB_LIBRARYZLIB_INCLUDE_DIR) C:/cmake-3.7.0-rc1-win32-x86/share/cm

  • 如http://spockframework.org/spock/docs/1.3/interaction_based_testing.html#_explicit_interaction_blocks中所述,我在块中使用,如下所示: Java代码: 斯波克试验: null

  • 我在spock和groovy的初始阶段,我试图测试一个简单的Spring启动应用程序,并获得 下面是我的java和groovy代码以及异常详细信息 TestController.java TestControllerSpec。棒极了 我确信我的代码遗漏了什么

  • 我有一个关于SPOCK+Drools测试的问题。事情是这样的, 我正在部署的webapp W/O中运行测试(如果这有任何意义的话:))。我将jar添加到类路径中(而不是WEB-INF/libs),现在运行良好。