我正在尝试以下场景:
事实上,在我的Kafka主题中,我有500条消息要消费,所以我期待以下行为:
但实际情况如下:
Spring Kafka读取500条消息,分为50条消息块,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到500条消息。
所以,我怀疑:
消费者未提交补偿仅在以下情况下才会产生影响:
所以在一个完美的世界里,你根本不需要提交,它会消耗所有的消息,因为消费者首先要求1-50,然后是51-100。
但如果消费者崩溃了,没人知道消费者读到的补偿是多少。如果使用者提交了偏移量,当它重新启动时,它可以检查偏移量主题,查看崩溃的使用者离开的位置,然后从那里开始。
max.poll.records
定义了一次获取多少条记录,但并没有定义要获取哪些记录。
你的第一个问题:
如果我配置了max.poll。记录到50张,如果我什么都没做,SpringKafka怎么能获得接下来的50张唱片呢?我知道poll()方法应该返回相同的记录。
首先,为了确保您没有提交任何内容,必须确保您理解以下3个参数,我相信您已经理解了。
消费者配置。ENABLE_AUTO_COMMIT_CONFIG
,将其设置为false(这也是推荐的默认值)。如果设置为false,请注意auto.commit.interval.ms
变得无关紧要。看看这个留档:因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢Kafka消费者配置。ENABLE_AUTO_COMMIT_CONFIG是假的。从版本2.3开始,它无条件地将其设置为false,除非在消费者工厂中特别设置或容器的消费者属性重写。
>
工厂。getContainerProperties()。setSyncCommits(真/假)
设置是否呼叫
消费者。当容器负责提交时,commitSync()
或commitSync()
。默认为true。这负责与Kafka同步,如果设置为true,则该调用将被阻止,直到Kafka响应。
其次,没有消费者投票()将不会返回相同的记录。对于当前运行的消费者,它用一些内部索引跟踪内存中的偏移量,我们不必关心提交偏移量。请参阅@GaryRussell的解释。
简而言之,他解释道:
一旦轮询返回记录(以及未提交的偏移量),它们将不会再次返回,除非重新启动使用者或对使用者执行seek()操作,将偏移量重置为未处理的偏移量。
你的第二个问题:
《SpringKafka》里有什么藏品吗?如果是,如果缓存中有100万条记录而没有提交,这可能是个问题。
没有“缓存”,它只是关于偏移和提交,如上所述的解释。
现在要实现你想做的事情,你可以考虑在获取前50条记录后做两件事,即下一次投票():
或者,以编程方式重新启动容器
- 或调用
consumer.seek(分区,偏移);
另外:
无论您选择什么配置,您都可以通过查看此输出的LAG
列来查看结果:
kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name
我有一个很棒的管道脚本: 但在输出中,我看到错误“找不到这样的字段:field java.lang.String size”: 我做错了什么? 编辑:我没有“待定签名批准”
我正在使用cmake Gui构建开罗。当我点击“配置”时,Cmake显示以下错误: 找不到PIXMAN,尝试在系统变量PIXMAN中设置PIXMAN根文件夹的路径(缺少:PIXMAN_LIBRARIESPIXMAN_INCLUDE_DIRS) 找不到ZLIB(缺少:ZLIB_LIBRARYZLIB_INCLUDE_DIR) C:/cmake-3.7.0-rc1-win32-x86/share/cm
如http://spockframework.org/spock/docs/1.3/interaction_based_testing.html#_explicit_interaction_blocks中所述,我在块中使用,如下所示: Java代码: 斯波克试验: null
我在spock和groovy的初始阶段,我试图测试一个简单的Spring启动应用程序,并获得 下面是我的java和groovy代码以及异常详细信息 TestController.java TestControllerSpec。棒极了 我确信我的代码遗漏了什么
我有一个关于SPOCK+Drools测试的问题。事情是这样的, 我正在部署的webapp W/O中运行测试(如果这有任何意义的话:))。我将jar添加到类路径中(而不是WEB-INF/libs),现在运行良好。