当前位置: 首页 > 知识库问答 >
问题:

使用Spring-boot:Consumer的Kafka通过ConsumerRecord使用java.lang.object(any/all对象)

巫健柏
2023-03-14

因此,我使用Spring-Boot2.1.6并集成kafka consumer来使用在该主题上发布的任何类型的消息。作为参考,我在https://docs.spring.io/spring-boot/docs/2.1.6.release/reference/htmlsingle/

所以我在POM中有依赖性:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

我正在application.yml中进行配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            value:
              default:
                type: java.lang.Object
 @KafkaListener(topics = "videoEnrichedEvents")
    public void consume(@Payload VideoEnrichedEventsvideoEnrichedEvents){
        LOGGER.debug("Consumed message :"+videoEnrichedEvents);
        System.out.println("Consumed Message :"videoEnrichedEvents);
    }
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.calamp.connect.vs.model.VideoEnrichedEvents] for GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}], failedMessage=GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}]
 @KafkaListener(topics = "videoEnrichedEvents")
    public void consume(@Payload ConsumerRecord consumerRecord){
        LOGGER.debug("Consumed message!!!Full :"+consumerRecord);
        System.out.println("Consumed Message!!! Actual object :"+((LinkedHashMap)consumerRecord.value()));
    }

共有1个答案

霍建柏
2023-03-14

处理此问题的最简单方法是使用ByteArrayDeserializerByteArrayJsonMessageConverterbean(只需将其添加到应用程序上下文中,引导将其连接进来)。

这样,从JSON的转换被推迟到调用方法之前,以便我们知道目标类型是什么。

参见https://docs.spring.io/spring-kafka/docs/2.5.0.release/reference/html/#messaging-message-conversion

 类似资料:
  • 问题内容: 我有一个接口,该方法需要一个数组: 我正在使用Mockito嘲笑此接口,我想断言该接口已被调用,但是我不想验证传递了什么参数-“无关”。 如何使用通用方法代替编写以下代码? 问题答案: 从Java 8开始,您可以使用无参数方法,并且类型参数将由编译器推断: 说明 Java 8中的新事物是表达式的 目标类型 将用于推断其子表达式的类型参数。在Java 8之前,仅在大多数情况下用于类型参数

  • 问题内容: 我在路由方面遇到了一些麻烦。 我正在使用CMS,需要两条主要路线。和。该控制器用于路由和控制应该用于任何东西比其他。然后,从控制器中,我将解析URL并显示正确的内容。 这就是我所拥有的: 第一条路线有效,但第二条路线无效。我稍微玩了一下,看来如果我不带问号使用,只有在之后放一些东西,它才能起作用。如果我 确实 把问号放在那儿,那根本不起作用。 我希望以下所有路线都转到view @ in

  • 本文向大家介绍python中map、any、all函数用法分析,包括了python中map、any、all函数用法分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python中map、any、all函数用法。分享给大家供大家参考。具体分析如下: 最近想学python,就一直比较关注python,昨天在python吧看到有个帖子提问怎么在python中怎么判断密码是否符合规范,回帖中有很

  • 如果我有一个主题,它有5个分区,然后我有一个服务消耗这5个分区。然后在consumer,我轮询并返回一个ConsumerRecords数组。 每个单独的ConsumerRecord是否可以来自这5个分区中的任何一个?

  • 这是我的简单代码 这条线呢 重定向到taskOKAction,但它只允许我通过URL发送参数(?task=123)。 我需要将object$task发送到taskOKAction,以便在屏幕上打印用户在表单中键入的内容。 我该怎么做?在询问好的解决方案是存储表单中的数据(例如,在数据库或文件中)并只在URL中传递对象ID的参数之前,我已经对stackoverflow发了火。我认为这是一个很好的解决