当前位置: 首页 > 知识库问答 >
问题:

为什么async producer使用linger生成消息。ms和batch。大小设置为大值,自动刷新设置为false?

邴越彬
2023-03-14

我正在使用spring kafka 2.2.8,并使用以下设置编写一个简单的异步生成器:

producer config key : compression.type  and value is : none
producer config key : request.timeout.ms  and value is : 10000
producer config key : acks  and value is : all
producer config key : batch.size  and value is : 33554431
producer config key : delivery.timeout.ms  and value is : 1210500
producer config key : retry.backoff.ms  and value is : 3000
producer config key : key.serializer  and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : security.protocol  and value is : SSL
producer config key : retries  and value is : 3
producer config key : value.serializer  and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer
producer config key : max.in.flight.requests.per.connection  and value is : 1
producer config key : linger.ms  and value is : 1200000
producer config key : client.id  and value is : <<my app name>>

我使用下面的代码片段打印了上面的生产者设置:

DefaultKafkaProducerFactory defaultKafkaProducerFactory = (DefaultKafkaProducerFactory) mykafkaProducerFactory;
        Set<Entry> set  = defaultKafkaProducerFactory.getConfigurationProperties().entrySet();
        set.forEach( item ->
                System.out.println("producer config key : "+item.getKey()+"  and value is : "+item.getValue())
        );

现在我通过调用下面的构造函数来创建一个KafkaTemplate,其中autoFlush为false

public KafkaTemplate(mykafkaProducerFactory, boolean autoFlush) 

现在我有一个异步生产者在10秒内产生10条消息。然后,令人惊讶的是,我在几秒钟内就把这10条信息全部发布到了这个主题上,我确信这10条信息的总和要比我的批量小得多。尺码:33554431

现在我的问题是

  1. 为什么要发布消息而不是等待linger.ms或batch.size才生成消息?

共有1个答案

施永宁
2023-03-14

看起来您没有正确设置这些属性;展示你是如何设置它们的。我刚测试了一下

batch.size=1000000
linger.ms=10000

并背靠背发送了10条信息,它们花了整整10秒到达消费者手中。

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch

spring.kafka.producer.properties.batch.size=1000000
spring.kafka.producer.properties.linger.ms=10000
@SpringBootApplication
public class So62820095Application {


    private static final Logger LOG = LoggerFactory.getLogger(So62820095Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62820095Application.class, args);
    }

    @KafkaListener(id = "so62820095", topics = "so62820095")
    public void listen(List<String> in) {
        LOG.info(in.toString());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62820095").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0,  10).forEach(i -> template.send("so62820095", "foo" + i));
    }

}
 类似资料:
  • 问题内容: 我一直想知道为什么JDBC API提供了自动提交模式()。似乎很吸引人的麻烦只是诱使人们陷入麻烦。我的理论是,仅将其添加到JDBC是为了简化希望创建使用JDBC编辑和运行SQL的工具的供应商的生活。是否有其他原因打开自动提交,还是总是错误? 问题答案: 不幸的是,使用自动提交是特定于数据库的(事务行为也是如此)。我认为,如果您没有全局的程序化交易策略,自动提交可能比仅希望每个人都正确关

  • 我是Java的初学者,刚开始使用Intellij作为我的IDE。 当我使用它时,有时会延迟。 我更改了我的 xms 和 xmx 以获得更大的堆大小(xms = 1024,xmx = 2048),但它抛出了一个错误。 所以,我把它回滚了。 错误消息是这样的:“初始堆大小设置为大于最大堆大小的值”。 有什么问题? 如果可能,如何增加最大堆大小? 我用的是笔记本电脑,它有8GB内存。x64Intelli

  • 我正在使用IntelliJ 2020.1 Ultimate,并且有一个JBoss 7.0.2服务器,我想从IntelliJ运行。 我将其添加为配置: 但是当我尝试启动服务器时,我收到以下错误: IDEA.app/Contents/plugins/Kotlin/lib/jps/kotlin-jps-plugin.jar:/Applications/IntelliJ IDEA.app/Contents

  • 问题内容: 所以我已经在Java编程学了一个学期左右的时间,而且我遇到了几次这个问题,最后才开始提出问题。 如果我做一个然后设置大小,例如。帧实际上并不长。据我所知,它实际上更长。另外,如果您将垂直尺寸设置得非常小(低于30),则框架甚至不会显示,只有操作系统顶部的窗口栏和框架才会变大,直到您将值超过30(这样看起来与)相同。为什么会这样,修复起来并不难,但是很奇怪,我很好奇为什么会这样? 如果您

  • 我想为我的自定义数据源更改光池大小,我使用的是Spring boot 2版本。 我可以设置数据源url,数据源密码等。我将值写入application.properties文件。之后,我用environment.getproperty读取这些值并设置dataSource,但是我不知道池大小的相同过程:(

  • 我遇到了我最近升级到的Intellij 2019.2.1版本的问题。我已经在智能帮助中设置了Xms和Xmx - 现在,当我启动应用程序时,在应用程序的VM参数中,我传递-Xmx800m,然后在运行应用程序时,它抛出一个错误,指出“初始堆大小设置为大于最大堆大小的值”。 有人能告诉我我在这里放了什么错误的配置吗?还是我漏掉了什么?