当前位置: 首页 > 知识库问答 >
问题:

我是否有可能在SpringKafka向制作者和消费者提供运行时的配置细节?

仰英发
2023-03-14

我们正在使用Spring boot将我们的项目迁移到微服务。我有一个要求,我应该能够使用配置详细信息(例如用户在运行时提供的引导服务器和键值服务器)运行生产者和消费者。

在之前的项目中,我可以使用java Apache Kafka API实现这一点,但我看不到使用spring Kafka API实现这一点的任何方法,因为它只允许通过spring java配置类或应用程序定义与生产者或消费者相关的配置。财产。

共有1个答案

丰智
2023-03-14

您可以在运行时为消费者和生产者覆盖您想要的任何属性。

使用ConcurrentKafkaListenerContainerFactory创建侦听器容器:

@Autowired
ConcurrentKafkaListenerContainerFactory<String, String> factory;

...

    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("someTopic");
    container.getContainerProperties()
            .getKafkaConsumerProperties()
            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
    ...
    container.getContainerProperties().setMessageListener(someListener);
    container.start();

在生产者方面,使用以下构造函数:

/**
 * Create an instance using the supplied producer factory and properties, with
 * autoFlush false. If the configOverrides is not null or empty, a new
 * {@link DefaultKafkaProducerFactory} will be created with merged producer properties
 * with the overrides being applied after the supplied factory's properties.
 * @param producerFactory the producer factory.
 * @param configOverrides producer configuration properties to override.
 * @since 2.5
 */
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
    this(producerFactory, false, configOverrides);
}
 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我需要在一个代码块中使用流中的'n'项,然后完成,本质上是: 在我的情况下,我不能将签名更改为返回而简单地;实际上,我必须抛开流中的一些元素(而不是简单的逻辑)--以便为下游消费者做好准备,而下游消费者不需要知道这是如何发生的,甚至不需要知道这是如何发生的。 这个问题是关于“不做任何事”lambda的。 JDK中是否存在“Do Nothing”使用者,如“Do Nothing”函数?

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我在Ubuntu服务器上设置了Apache Kafka,并按照https://kafka.apache.org/quickstart中提到的前五个步骤进行了测试,一切正常。 然后,我继续安装kafka python 1.4.6以在python中进行测试,并编写了简单的生产者和消费者脚本。 我的侦听器 侦听器配置=纯文本://本地主机:9092 播发.侦听器=纯文本://本地主机:9092 这是脚本