当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka轮询,将@kafkalistener和listener ack-mode设置为记录

庄弘业
2023-03-14
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }

my Listener.AckMode为return并Enable.Auto.Commit设置为false,Partition.Assignment.Strategy:org.apache.kafka.clients.consumer.RoundroBinAssignor

1)我对并发的理解是,由于我将并发(在工厂级别)设置为30,并且我总共有30个分区(用于所有三个主题)可供读取,每个线程将被分配一个分区。我的理解正确吗?如果我在@KafKalistener注释中再次覆盖并发,会有什么影响?

2)当spring调用poll()方法时,它是否从所有三个主题进行轮询?

3)由于我将listener.ackmode设置为return,它是否会等到在一个poll()中返回的所有记录都完成后再发出下一个poll()?如果我的记录处理的时间比max.poll.interval.ms长,又会发生什么?假设在一个poll()调用中返回1-100个偏移量,而我的代码在max.poll.interval.ms被命中之前只能处理50个偏移量,此时spring会发出另一个poll吗,因为它已经命中max.poll.interval.ms?如果是的话,下一个poll()会从偏移量51返回记录吗?

真的很感谢你的时间和帮助

共有1个答案

柳鸿信
2023-03-14

我的Listener.AckMode已返回

没有这样的ACKMODE;由于您没有在工厂中设置它,所以您的实际ack模式是批处理(默认)。要使用ack模式记录(如果您的意思是这样的话),您必须这样配置工厂容器属性。

我对并发的理解是...

spring会不会在这个时候再发布一次民意调查

Spring无法控制--如果花费的时间太长,使用者线程就在侦听器中--使用者不是线程安全的,所以我们不能发出异步轮询。

必须在max.poll.interval.ms内处理max.poll.records以避免Kafka重新平衡分区。

 类似资料:
  • 在我的侦听器中,在使用消息后,如果发生任何异常,我将抛出一个异常。如果它成功了,那么我承认。但是,即使抛出异常,偏移量也不会后退。i、 e重试没有按预期进行。错误事件不会再次出现。 此外,我发现我没有消费所有预期的消息。我做错什么了吗? 监听器类 我正在做

  • 有没有办法设置(即减少)套接字的轮询频率。使用长轮询传输时的IO (v4.4)?在服务器上还是在客户机上? 文档中描述了长轮询传输,通常在无法建立 WebSocket 连接时充当回退(但可以显式设置): 套接字之间的双向通道。IO服务器(Node.js)和套接字。IO客户端(browser、Node.js或其他编程语言)尽可能使用WebSocket连接建立,并将使用HTTP长轮询作为后备。

  • 问题内容: 我真的很喜欢将sequelize用作我的节点应用程序的ORM,但是现在,当默认情况下在查询时它们传递DAO对象时,我有点恼火。如何始终将raw选项设置为true? 问题答案: 根据文档: 如果您不提供SQL以外的其他参数,则将raw假定为true,并且sequelize将不会尝试对查询结果进行任何格式化。 话虽如此 : Sequelize对象具有[options.query = {}]

  • 在这个例子中,我们展示了舍入模式。 IOTester.java import java.math.RoundingMode; import java.text.NumberFormat; import java.util.Locale; public class I18NTester { public static void main(String[] args) { Local

  • 问题内容: 我有一个JComboBox,其中的项目是查询的结果。该组合显示了来自查询的所有类别名称,对吗?好的,可以。现在,我需要为每个项目赋予一个值,该值就是产品的ID。 到目前为止,这是我得到的: 正如您在代码中看到的那样,每个项目的“标签”都是它的名称。现在,如何设置每个项目的ID,以便以后进行操作? 谢谢,并尝试轻松回答,我正尽最大努力尝试获取Java东西!哈! 问题答案: 默认情况下使用

  • 问题内容: 更新 :对于最新的Angular版本,该问题已过时,请参阅tsh对此职位的评论 我已经将复选框绑定到一个值: 控制器中复选框的值设置为1: 但是,最初该复选框未显示为选中状态。如果我将的初始值更改为,则可以。(jsfiddle演示) 我尝试了各种变化: 他们都不工作。如何使angular将参数视为数字? 问题答案: 您可以使用 ngChecked ,如果表达式为真,则将在元素上设置特殊