当前位置: 首页 > 知识库问答 >
问题:

如何在kafka springboot中从一个主题中读取多种类型的json

陶博耘
2023-03-14

我有一个主题,从中我可以接收不同类型的JSON。然而,当使用者试图读取消息时,我似乎得到了一个异常。我尝试添加其他bean名称,但没有成功。它似乎试图从主题中阅读,并试图转换到从主题中阅读的所有类型。是否有一种方法可以指定只对特定输入类型启用特定工厂。还有其他方法可以解决这个问题吗。

@EnableKafka
@Configuration
public class KafkaConfig {
    static Map<String, Object> config = new HashMap();

    static {
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    }


    @Bean
    public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
        JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptRequest> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, AssessmentQuestionAnalyticsEntity> assessmentQuestionAnalyticssEntityConsumerFactory() {
        JsonDeserializer<AssessmentQuestionAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aqKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aqKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentQuestionAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentQuestionAnalyticssEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
        JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean("slaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory slaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SiteLevelAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(siteLevelAnalyticsEntityConsumerFactory());
        return factory;
    }
}

服务

@Service
public class TopicObserver implements
        ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware{

    @Autowired
    private AssessmentAttemptService assessmentAttemptService;

    @Autowired
    private AssessmentQuestionService assessmentQuestionService;

    @Autowired
    private SiteLevelAnalyticsService siteLevelAnalyticsService;

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aaKafkaListenerFactory")
    public void consumeAttemptDetails(AssessmentAttemptRequest request) {
        assessmentAttemptService.storeAttempDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aqKafkaListenerFactory")
    public void setAssessmentQeustionAnalytics(AssessmentQuestionRequest request) {
        assessmentQuestionService.storeQuestionDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "slaKafkaListenerFactory")
    public void siteLevelAnalytics(SiteLevelAnalyticsRequest request) {
        siteLevelAnalyticsService.storeSiteLevelDetailsDetails(request);
    }
}

共有1个答案

米景辉
2023-03-14

@死侍是对的。如果您需要一个更简单的解决方案,可以将消息作为字符串JSON有效负载使用,并手动将它们反序列化为对象。

        @Bean
        public ConsumerFactory<Integer, String> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
              kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

在侦听器中,作为字符串使用。

@KafkaListener(id = "foo", topics = YOUR_TOPIC)
    public void listen(String json){
    //Convert to Object here.
}
 类似资料:
  • 我有两种不同类型的Avro数据,它们有一些公共字段。我想在映射器中读取这些公共字段。我想通过在集群中生成单个作业来阅读本文。 下面是示例avro模式 模式1: {“type”:“record”,“name”:“Test”,“namespace”:“com.abc.schema.SchemaOne”,“doc”:“Avro使用MR.存储模式”,“fields”:[{“name”:“EE”,“type

  • 所以我必须阅读5个文件行,显示如下(Java): 我必须输入以分号分隔的数据作为单个变量,即<代码>国际价格;等 从文件中读取时,如何将、、等分为不同的数据类型?目标是构建一个对象列表,其中每个数据都作为构造函数参数。 谢谢你的任何帮助

  • 我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。

  • 我是企业架构师的新手。我想知道有没有一种方法可以从已经存在的其他类型的图中自动生成某种类型的图,如果有的话--可以实现哪种类型的图。 例如,序列图是否可以从给定的类图中生成?或者这是不可能的,因为序列图是行为的,而类图是结构的? 提前道谢!

  • 假设我有一个名为的Kafka主题,它有几个消息类型(每个消息类型都有不同的Avro模式),如、等等。我想了解一下用Spring Cloud Stream发布/接收相同主题的不同类型是否可行(而且有意义)。特别是,拥有几个将非常有用,每个专用于特定类型。根据这篇博文,当需要订购消息时,这是非常有用的,因为它们与同一个实体相关。这种情况下的配置示例是什么?

  • 我有一个建立在Kafka之上的事件源应用程序。目前,我有一个主题中有多个消息类型。所有序列化/反序列化的JSON。 那么这种方法如何与Kafka流媒体应用程序一起工作呢?在该应用程序中,您需要指定一个键和值serde? 我是不是应该忘了Avro而改用protobuff呢?