当前位置: 首页 > 知识库问答 >
问题:

Spring kafka在一个消费者中使用多种消息类型

嵇弘新
2023-03-14

我有多个制作人,可以向一个Kafka主题发送多种类型的事件。

我有一个消费者,它必须消费所有类型的消息。每种类型的消息都有不同的逻辑。

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}

但在这种情况下,所有消息都指向此方法,不仅是EventOne

如果我实现了两种方法(对于每种类型的消息),那么所有消息都只能使用一种方法。

如果我像这样实现监听器:

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}

然后我得到一个例外:org。springframework。Kafka。KafkaListenerEndpointContainer#0-0-kafka-listener-1]错误组织。springframework。Kafka。听众。LoggingErrorHandler-处理时出错:ConsumerRecord java。lang.IllegalArgumentException:无法识别的类型:[null]

请告诉我如何实现多类型消费者?

共有1个答案

汪阳辉
2023-03-14

我找到了soultion,它很简单。所以,从问题中不清楚,但我这样使用jsonMessageConverter:

    @Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

默认情况下,Jackson库不会为JSON添加类信息,因此它无法猜测我要反序列化的类型。解决方案是注释

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")

这会将classinformation添加到json字符串中。在消费者方面,我只编写事件的基类

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(BasicEvent event) {

    if (event instanceof EventOne) {
        logger.info("type={EventOne}, event={}", event);
    } else {
        logger.info("type={EventTwo}, event={}", event);
    }
}

希望此信息对某人有所帮助。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用