当前位置: 首页 > 知识库问答 >
问题:

如何使用spring boot设置kafka使用者并发性

董同
2023-03-14

我正在编写一个基于Java的Kafka消费者应用程序。我正在为我的应用程序使用kafka-clients、Spring Kafka和Spring boot。虽然Spring boot让我可以轻松地编写Kafka消费者(无需真正编写ConcurrentKafkaListenerContainerFactory、ConsumerFactory等),但我希望能够为这些消费者定义/定制一些属性。然而,我无法找到一个简单的方法来做它使用Spring Boot。我有兴趣设置的一些属性是-

consumerconfig.max_partition_fetch_bytes_configconsumerconfig.partition_assignment_strategy_config

我在这里查看了Spring Boot的预定义属性。

版本-

  • Kafka-客户机-0.10.0.0-sasl
  • Spring-Kafka-1.1.0.发布
  • Spring Boot-1.5.10.发布

共有1个答案

单于高逸
2023-03-14

在您引用的URL处,向下滚动到

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

Spring-Kafka-1.1.0.发行版

我建议至少升级到1.3.5;它有一个简单得多的线程模型,这要归功于KIP-62。

spring.kafka.consumer.properties.heartbeat.interval.ms
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
 类似资料:
  • 我试图设置本地DynamoDB实例与SpringBoot。我跟着这个,但是格拉德尔。 当我尝试运行我的应用程序时,会出现以下异常: 我知道这是由于歧义导致的依赖注入失败,但我的是一个无参数构造函数。不确定歧义在哪里。 以下是我的代码: 格雷德尔锉刀 发电机配置 代理(实体) @DynamoDBTable(tableName="Agent")公共类Agent{私有字符串代理号;私有整数id;私有企业

  • 轻应用的使用者支持按照组织机构或单人进行添加,同时可以设置添加范围和移除权限。 设置组织机构:轻应用-设置使用组织结构(勾选组织机构节点后,所有自组织机构及成员都将订阅此应用) 设置使用者:轻应用-新增使用者/删除使用者 设置添加范围和移除权限

  • 例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?

  • 我创建了一个带有三个分区的Kafka主题。使用Spring Kafka中的ProducerFactory,我可以创建一个producer实例。但是,我想创建三个生产者实例,因为我有三个分区。类似地,我想要三个consumer的实例。我该怎么做?请帮忙。

  • 我创建了一个接收器Kafka连接,将数据转换为其他存储;我想在使用 创建新连接器时将 设置为;我已经设置了配置; 但是当任务开始时,kafka 消费者仍然最早轮询记录;任何其他将 设置为最新版本的方法也是如此;

  • 我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”: