我正在使用spring kafka 2.2.8,并使用以下设置编写一个简单的异步生成器:
producer config key : compression.type and value is : none
producer config key : request.timeout.ms and value is : 10000
producer config key : acks and value is : all
producer config key : batch.size and value is : 33554431
producer config key : delivery.timeout.ms and value is : 1210500
producer config key : retry.backoff.ms and value is : 3000
producer config key : key.serializer and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : security.protocol and value is : SSL
producer config key : retries and value is : 3
producer config key : value.serializer and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer
producer config key : max.in.flight.requests.per.connection and value is : 1
producer config key : linger.ms and value is : 1200000
producer config key : client.id and value is : <<my app name>>
我使用下面的代码片段打印了上面的生产者设置:
DefaultKafkaProducerFactory defaultKafkaProducerFactory = (DefaultKafkaProducerFactory) mykafkaProducerFactory;
Set<Entry> set = defaultKafkaProducerFactory.getConfigurationProperties().entrySet();
set.forEach( item ->
System.out.println("producer config key : "+item.getKey()+" and value is : "+item.getValue())
);
现在我通过调用下面的构造函数来创建一个KafkaTemplate,其中autoFlush为false
public KafkaTemplate(mykafkaProducerFactory, boolean autoFlush)
现在我有一个异步生产者在10秒内产生10条消息。然后,令人惊讶的是,我在几秒钟内就把这10条信息全部发布到了这个主题上,我确信这10条信息的总和要比我的批量小得多。尺码:33554431
现在我的问题是
看起来您没有正确设置这些属性;展示你是如何设置它们的。我刚测试了一下
batch.size=1000000
linger.ms=10000
并背靠背发送了10条信息,它们花了整整10秒到达消费者手中。
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
spring.kafka.producer.properties.batch.size=1000000
spring.kafka.producer.properties.linger.ms=10000
@SpringBootApplication
public class So62820095Application {
private static final Logger LOG = LoggerFactory.getLogger(So62820095Application.class);
public static void main(String[] args) {
SpringApplication.run(So62820095Application.class, args);
}
@KafkaListener(id = "so62820095", topics = "so62820095")
public void listen(List<String> in) {
LOG.info(in.toString());
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62820095").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 10).forEach(i -> template.send("so62820095", "foo" + i));
}
}
问题内容: 我一直想知道为什么JDBC API提供了自动提交模式()。似乎很吸引人的麻烦只是诱使人们陷入麻烦。我的理论是,仅将其添加到JDBC是为了简化希望创建使用JDBC编辑和运行SQL的工具的供应商的生活。是否有其他原因打开自动提交,还是总是错误? 问题答案: 不幸的是,使用自动提交是特定于数据库的(事务行为也是如此)。我认为,如果您没有全局的程序化交易策略,自动提交可能比仅希望每个人都正确关
我是Java的初学者,刚开始使用Intellij作为我的IDE。 当我使用它时,有时会延迟。 我更改了我的 xms 和 xmx 以获得更大的堆大小(xms = 1024,xmx = 2048),但它抛出了一个错误。 所以,我把它回滚了。 错误消息是这样的:“初始堆大小设置为大于最大堆大小的值”。 有什么问题? 如果可能,如何增加最大堆大小? 我用的是笔记本电脑,它有8GB内存。x64Intelli
我正在使用IntelliJ 2020.1 Ultimate,并且有一个JBoss 7.0.2服务器,我想从IntelliJ运行。 我将其添加为配置: 但是当我尝试启动服务器时,我收到以下错误: IDEA.app/Contents/plugins/Kotlin/lib/jps/kotlin-jps-plugin.jar:/Applications/IntelliJ IDEA.app/Contents
问题内容: 所以我已经在Java编程学了一个学期左右的时间,而且我遇到了几次这个问题,最后才开始提出问题。 如果我做一个然后设置大小,例如。帧实际上并不长。据我所知,它实际上更长。另外,如果您将垂直尺寸设置得非常小(低于30),则框架甚至不会显示,只有操作系统顶部的窗口栏和框架才会变大,直到您将值超过30(这样看起来与)相同。为什么会这样,修复起来并不难,但是很奇怪,我很好奇为什么会这样? 如果您
我想为我的自定义数据源更改光池大小,我使用的是Spring boot 2版本。 我可以设置数据源url,数据源密码等。我将值写入application.properties文件。之后,我用environment.getproperty读取这些值并设置dataSource,但是我不知道池大小的相同过程:(
我遇到了我最近升级到的Intellij 2019.2.1版本的问题。我已经在智能帮助中设置了Xms和Xmx - 现在,当我启动应用程序时,在应用程序的VM参数中,我传递-Xmx800m,然后在运行应用程序时,它抛出一个错误,指出“初始堆大小设置为大于最大堆大小的值”。 有人能告诉我我在这里放了什么错误的配置吗?还是我漏掉了什么?