当前位置: 首页 > 知识库问答 >
问题:

如何通过代码在KafkaListener中设置主题名称?

黄聪
2023-03-14

我正在为Kafka主题编写侦听器,并使用@kafkaListener注释来做同样的事情。到目前为止,我硬编码了主题名(topic1),效果很好。这是一个有效的代码:-

@Component
public class KafkaConsumer {

@Autowired
private KafkaProperties kafkaProps;


@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;

@KafkaListener(topics = "topic1", containerFactory = "createPokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
    throws JsonProcessingException {

    LOG.info("Received message for po create from kafka topic {} is {}",
        kafkaProps.getOmsCreateTopicName(), objectMapper
            .writerWithDefaultPrettyPrinter().writeValueAsString(po));

    ack.acknowledge();


}

}

现在,当我试图更改代码以从代码中获取主题名称时,它不起作用。代码,我试图从代码中获取值后,在堆栈溢出(如何将动态主题名称传递给@kafkalistener(主题从环境变量)是:-

@Component
public class KafkaConsumer {

@Autowired
private KafkaProperties kafkaProps;

public KafkaProperties getKafkaProps() {
    return kafkaProps;
}

@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;

@KafkaListener(topics = "#{__listener.kafkaProps.getOmsCreateTopicName()}", containerFactory = "omsCreatePokafkaListenerContainerFactory")
    public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
    throws JsonProcessingException {

    LOG.info("Received message for po create from kafka topic {} is {}",
        kafkaProps.getOmsCreateTopicName(), objectMapper
            .writerWithDefaultPrettyPrinter().writeValueAsString(po));

    ack.acknowledge();


}

}

当我试图启动服务器时,它抛出错误:-

bean初始化失败;嵌套的异常是org。springframework。豆。工厂BeanExpressionException:表达式解析失败;嵌套的异常是org。springframework。表示斯佩尔。SpelEvaluationException:EL1008E:在类型为“org”的对象上找不到属性或字段“_listener”。springframework。豆。工厂配置。BeanExpressionContext’——可能不公开?

谁能帮帮我我做错了什么?

共有2个答案

阳勇
2023-03-14

在消费者类别中,您需要进行以下更改:

@Autowired
public KafkaProperties kafkaProps;

@KafkaListener(topics = "#{kafkaConsumer.kafkaProps.getOmsCreateTopicName()}"

这对我很管用。

董品
2023-03-14

1.1.x不再受支持;如果无法升级到Spring Framework 4.3之外,则应升级到1.3.10,这是与Spring 4.3一起使用的最新版本;它有一个比1.1更简单、更可靠的线程模型。x、 多亏了KIP-62。

虽然您不能在1.3. x中使用__listener,但您可以使用SpEL表达式直接从KafkaPropertiesbean获取主题:

@KafkaListener(topics = "#{kafkaProps.omsCreateTopicName}" ...)
 类似资料:
  • 我已经为我的应用程序实现了材质设计,我希望用户能够自定义colorPrimary、colorPrimaryDark和colorAccent的颜色。怎么做? 我想有一个设置活动,用户可以选择自己的颜色,更改将应用于我的所有活动。谢谢

  • 问题内容: 我想强制Chrome调试器 通过代码 或使用某种注释标签(例如)来中断一行 代码。 问题答案: 您可以在代码中使用。如果开发者控制台已打开,则执行将中断。它也可以在萤火虫中使用。

  • 我在Vaadin7做一个项目。因为我需要改变一个页面的主题。 但在Vaadin 7里,我找不到这样的。 我知道会有办法做到的。 以及当我更改主题时如何在UI上应用更改?

  • 我想设置我的应用程序主题使用设置首选项我已经创建。这是我的密码。告诉我这之后怎么编码。mainactivity.java SettingsActivity.java SettingsFragment.java

  • 问题内容: 在我的文件中,具有以下内容。但是,容器不会选择主机名值。有任何想法吗? 当我检查容器中的主机名时,它没有启动。 问题答案: 我发现使用时主机名对其他容器不可见。事实证明这是一个已知问题(也许是一个已知功能),其中一部分讨论是: 我们可能应该在文档中添加有关使用主机名的警告。我认为它很少有用。 就容器网络而言,分配主机名的正确方法是定义一个别名,如下所示: 不幸的是,这 仍然 无法使用。

  • 在我的文件中,我有以下内容。但是容器没有选择主机名值。有什么想法吗? 当我检查容器中的主机名时,它不会拾取。