当前位置: 首页 > 知识库问答 >
问题:

如何使用工厂为特定主题配置Spring Kafka Listener?

刘阳荣
2023-03-14

我希望能够通过属性阅读主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用Spring靴。

我尝试通过“topics”键直接从properties对象中读取主题。这将产生一个错误:< code>IllegalStateException:必须提供topics、topicPattern或topicPartitions。

// some class
@KafkaListener
public void listener(List<String> messages) {
  System.out.print(messages);
}

//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
  return new DefaultKafkaConsumerFactory(topicProp);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

  Properties prop = new Properties();
  prop.setProperty("topics", "my-custom-topic");

  factory.setConsumerFactory(this.consumerFactory(prop));
  return factory;
}

Is this possible?

共有1个答案

马嘉勋
2023-03-14

您可以在主题中引用其他 bean(或有关 bean 的方法)

java prettyprint-override">@Bean
public String topicName() {
    return "my-custom-topic";
}

...

@KafkaListener(topics = "#{@topicName}")
...

或者

@KafkaListener(topics = "#{@someBean.someMethod()}")
 类似资料:
  • 我正在尝试在eclipse中设置Hibernate Tools。问题是它找不到任何映射文件。 我创建了一个控制台配置,指向我的environment.properties文件和hibernate.cfg.xml.问题是hibernate.cfg.xml.中没有映射 看起来它在myproject持久化中使用了spring bean sessionFactory。xml(如下)来查找所需的映射文件。我

  • 此配置适用于所有 基本主题。 常用配置 日间模式 - 夜间模式 classic 主题默认提供了对日间模式和夜间模式的支持,并在导航条上提供了切换开关。 通过以下配置可以自定义各模式:docusaurus.config.js module.exports = { // ... themeConfig: { // ... colorMode: { // "

  • 我试图在Websphere Application Server中添加ActiveMQ作为JMS提供程序。 我已经按照这里描述的说明ActiveMQ5.11和WebSphere Application Server8.5,并根据主题进行了调整。 不幸的是,我不确定需要为主题连接工厂和主题定义在外部JNDI名称中添加什么。 根据IBM文档: “外部JNDI名称用于将队列绑定到应用程序服务器名称空间的

  • 目前,我有一个Maven2项目,在运行时构建一个JAR: 我现在需要调整,以将此JAR()发布到运行于以下位置的ArtFactory服务器: 我尝试添加一个<代码> 关于如何让出版业开始工作,有什么想法吗?为了简单起见,假设这个人工repo经过身份验证,可以接受用户使用和进行的发布/写入。

  • 问题内容: 我有一个在Hudson奴隶上运行的项目。我希望其中之一在Java6而不是默认值(我的环境中为Java5)下运行Ant。 在项目配置视图中,我希望找到以下两者之一: 一个显式选项,允许我设置一个自定义JDK位置以用于该项目。 一种为该项目设置自定义环境变量的方法,这将允许我将JAVA_HOME设置为JDK6位置。这将使Ant能够根据需要在Java6上运行。 有什么办法可以做到以上任一?如

  • 我有这样的拓扑: 拓扑中最后提到的源是每个应用程序实例的特定主题。我希望该主题仅由该实例处理。此主题的数据由前一个处理器推送,基于哪个实例必须处理该消息。 但是一旦流启动,它会尝试将实例特定的主题分区也分配给其他实例。我们可以在Kafka流中实现这个要求吗? 我希望一个主题仅由特定实例处理。