当前位置: 首页 > 知识库问答 >
问题:

默认的kafkaListenerContainerFactory是如何工作的

申屠鹏
2023-03-14

我有一个kafka消费java应用程序。我正在阅读该应用程序中两个独立的Kafka主题。

对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。消息是avro格式的。Pojo1是使用avro模式构建的pojo类。

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

对于使用来自topic1的消息,我们有以下方法

@kafkaListener(topics=“topic1”)公共布尔readTopic1(ConsumerRecord

现在这个应用程序也在从topic2读取消息,topic2也有avro消息,Pojo2是使用各自的avro模式构建的pojo类。

阅读主题2,我们有以下方法

@kafkaListener(topics=“topic2”)公共布尔readTopic2(ConsumerRecord

现在,我不明白topic2的消息是如何按预期工作的。

kafkaListenerContainerFactory配置了Pojo1,那么Pojo2如何从topic2读取消息呢。据我所知,kafka只在缺少名为“kafkaListenerContainerFactory”的bean时创建默认容器工厂,但在我的应用程序中,我们已经为topic1构建了一个kafkaListenerContainerFactory。

根据规定,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用@kafkaListener(topics=“topic2”,containerFactory=“factoryForTopic2”)

有人能帮我理解KafkaListenerContainerFactory是如何使用不同的消息模式为不同的主题工作的吗?


共有2个答案

焦信鸥
2023-03-14

这只适用于您的情况,因为反序列化程序能够将字符串反序列化为两个对象。但是想象一下,你使用Avro,你仍然有Pojo1和Pojo2。在这种情况下,你需要两个集装箱工厂。如果不为Pojo2提供一个异常,将引发异常-MessageConversionException:无法将字符串转换为GenericMessage的Pojo2。解决方案是定义@kafkaListener(topics=“topic2”,containerFactory=“factoryForTopic2”)

杨无尘
2023-03-14

这叫做类型擦除;泛型类型仅在编译时适用。在运行时,只要反序列化程序创建正确的类型,它就会工作。

它可能是更干净的使用ConAutomtKafkaListenerContainerFactory

也就是说,您的bean被称为importDecisionMessageKafkaListenerContainerFactory。对于要使用非标准工厂的侦听器,必须在侦听器的containerFactory属性中指定其名称;否则将使用kafkaListenerContainerFactory

 类似资料:
  • 我已经看了下面的帖子。我仍然无法理解重定向URI的概念。 https://www.baeldung.com/spring-webclient-oauth2 https://docs.spring.io/spring-security/site/docs/5.0.7.RELEASE/reference/html/oauth2login-advanced.html#oauth2login-高级重定向终

  • 问题内容: 是否可以在Eclipse 3.4.1中设置默认的工作目录?默认情况下是: $ {workspace_loc :(项目名称)} 但我希望它像 $ {custom_var} 对于每个类,我可以将运行配置->参数->工作目录更改为“其他”,但最好更改默认值。我有很多需要从该特定目录运行的类。 问题答案: 您可以做的一件事是设置一个启动配置,然后右键单击它,然后选择“重复”。这将保留所有参数。

  • 问题内容: 我有一个JRE,那是我安装Eclipse时唯一的一个。随后,我在其中安装了完整的JDK 并将其环境变量更改为该变量。但是,每次我启动一个新的Eclipse工作区时,它只会拾取旧的JRE,我必须手动将其删除并添加新的JRE。 如何将Eclipse安装绑定到新的JDK,以便每个新的工作区都仅指向该文件夹?我检查了一下,但是那里没有提到要使用的JRE。 更新: 我进入s,添加了新位置,将其标

  • 我通过指定分区的数量从文本文件创建RDD。但它给我的分区数与指定的分区数不同。 文件 /home/pvikash/data/test.txt的内容是: 我试图理解为什么这里的分区数量在变化,如果我们有小数据(可以容纳一个分区),那么为什么spark会创建空分区? 任何解释都将不胜感激。

  • 问题内容: 说我有自己的课程 它具有一些属性和方法。它不实现等于,不实现hashCode。 一旦我们调用equals和hashCode,默认的实现是什么?从对象类?那是什么 默认值等于如何工作?默认的hashCode将如何工作,返回什么?==只会检查它们是否引用同一个对象,因此很容易,但是equals()和hashCode()方法呢? 问题答案: 是的,默认实现是Object的实现(通常来说;如果

  • 我正在尝试使用更改整个Excel工作簿()的默认单元格样式。这应该应用于用户可能创建的新单元格(在保存工作簿之后)。我试图通过调用--我认为这是工作簿的默认样式--来实现这一点,然后将此样式修改为我想要的新默认样式。 当我读入一个现有的文件(一个“模板”文件)并修改默认样式时,这会起作用。但是,当我使用从头创建一个新的文件时,它就不起作用了。 当使用调试器单步执行时,我可以看到,当使用“模板”文件