当前位置: 首页 > 知识库问答 >
问题:

《Spring的Kafka》中的linger.ms没有按预期工作

严烨
2023-03-14

我有一个用例,我需要使用Kafka进行批处理。假设在1分钟内有大约100个请求,我不想立即发布每个请求,而是想将所有100个请求分批发布到topic一次。

但是使用以下配置,批处理不会发生,一旦发送消息,它就会发布到主题并同时在消费者中接收

生产者配置

public class KafkaProducerConfig {

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.ACKS_CONFIG, 1);
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 60000);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100000);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

}

消费者配置

public class KafkaConfig {

  ConsumerFactory<String, String> kafkaConsumerFactory(Boolean autoCommit) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    return new DefaultKafkaConsumerFactory<>(props);
  }



  @Bean("kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory(Boolean.TRUE));
    factory.setBatchListener(true);
    factory.setConcurrency(1);
    return factory;
  }
  
}

在这里,我设置了 linger.ms = 60000,根据我的理解,如果 linger.ms 设置为某个值,那么即使发送方线程更早变得可用并且未达到批大小,生产者也将至少等待该时间。

但是在我的情况下,一旦发送消息,它就会发布到主题,而无需等待60000毫秒或批大小达到正在设置的值

生产者

  @Autowired
  private KafkaTemplate<String, String> kafka;

  kafka.send("batch-test", message);

消费者

  @KafkaListener(id = "testGroup", topics = {"batch-test"})
  public void test(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    for (int i = 0; i < messages.size(); i++) {
      System.out.println(messages.get(i) + partitions.get(i) + "-" + offsets.get(i) + "");
    }
  }

这里的小更新我检查了Spring kakfa日志,发现ProducerConfig值设置不正确,它们正在退回到默认值。

roducerConfig values: 
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0

为什么 linger.ms,而 batch.size 会回退到默认值?

共有2个答案

酆华皓
2023-03-14

就我的理解而言,BATCH_SIZE_CONFIG (batch.size)将与LINGER_MS_CONFIG (linger.ms)一起完美地工作,只有当多个记录打算发送到同一个分区,而不是跨多个分区时,这意味着,您应该有一些键控机制或更好的分区策略来实现批处理,同时将事件生成到主题中。

来自文档:

当多条记录发送到同一个分区时,生产者将把它们批处理在一起。此参数控制每个批次将使用的字节内存量(不是消息!)。当批次已满时,批次中的所有消息都将被发送。然而,这并不意味着生产者将等待批次已满。生产者将发送半满批次,甚至批次中只有一条单页消息。因此,将批次大小设置得太大不会导致发送消息的延迟;它只会为批次使用更多内存。将批次设置得太小会增加一些开销,因为生产者需要更频繁地发送mesages。

linger.ms 控制在发送当前批处理之前等待其他消息的时间量。KafkaProducer 会在当前租金批次已满或达到 linger.ms 限制时发送一批消息。默认情况下,只要有发送者html" target="_blank">线程可以发送消息,即使批处理中只有一条消息,创建者也会发送消息。通过将 linger.ms 设置为大于 0,我们指示生产者等待几毫秒以向批处理添加其他消息,然后再将其发送到代理。这会增加延迟,但也会增加吞吐量(因为我们一次发送更多消息,因此每条消息的开销更少)。

锺离声
2023-03-14

找到了解决方案,linger。ms没有造成任何影响,因为没有从代码中设置该值(仍不知道原因)。因此,仍然存在。ms的默认值为0,因为没有进行批处理。

稍后,我设置了两个linger的值。ms批处理。应用程序中的size。属性,然后它工作。

spring.kafka.producer.batch-size=1000000
spring.kafka.producer.properties.linger.ms=10000
 类似资料:
  • 我正试图将一个阻塞消费者集成为Reactor铝-SR1中的助焊剂订户。我想使用一个并行调度器,并发地执行阻塞操作。 我实现了一个主类来描述我意图:

  • 我在使用R的group_by和SUMMARY函数时遇到了一些问题,我想知道你们是否可以帮我一些忙。我有一张类似的表格: 我试图使用dplyr的group_by和SUMMARY来找到频率列的平均值。下面是我的示例代码: 我所期望的是,一个表格被吐出来,分解按单个类别分组的平均频率,如下所示: 但是,我收到的是一个按类别分组的表,每个类别接收整个表的平均值,如下所示: 有什么线索吗?我应该说我是初学者

  • 以下switch语句具有奇怪的行为: 我认为,当cat命令在被执行时失败时,“after cat”将被写入,而||之后的部分将被执行。但是,当我查看输出时,似乎在回显“后猫”后会发生中断,因此实际状态不会改变,将再次进入。然后stty也会失败(因为串行适配器丢失)。之后,cat命令againt在开始时失败,但现在进入“catch”块。。。。 下面是相关的输出: 我做错了什么?

  • 我已经配置了log4j2.xml文件,application.log文件将被创建,它应该每天翻转。 但是在JVM中,applicatoin.log文件在10MB之后会翻转,如果翻转三次,第一个文件会被覆盖。也就是说我随时都application.logapplication-2020-10-16.log.zip. 为什么log4j2(v2.13)即使配置为每日,也会每10MB滚动一次文件?任何在l

  • 我目前正在使用Cucumber和Java开发一个基于Selenium的BDD测试自动化框架。我的框架使用Junit,由于Junit不支持软断言,所以我尝试在测试中使用AssertJ断言。然而,这些断言似乎不起作用。让我试着借助下面的代码来解释这一点:

  • 现在,在我的drools项目中,我在单独的DRL文件中有两组规则,它们由议程组分割。对于议程组“preCheck”,我将该议程组中的每个规则的自动聚焦设置为true。例子: 对于另一个议程组-“default规则”-规则没有设置自动焦点属性。示例: 在通过RESTAPI调用规则时,我还试图通过JSON负载将焦点设置为“preCheck”议程组。例子: 然而,在执行规则时,似乎首先要评估“defau