因此,我使用Spring-Boot2.1.6并集成kafka consumer来使用在该主题上发布的任何类型的消息。作为参考,我在https://docs.spring.io/spring-boot/docs/2.1.6.release/reference/htmlsingle/
所以我在POM中有依赖性:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
我正在application.yml中进行配置
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
value:
default:
type: java.lang.Object
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload VideoEnrichedEventsvideoEnrichedEvents){
LOGGER.debug("Consumed message :"+videoEnrichedEvents);
System.out.println("Consumed Message :"videoEnrichedEvents);
}
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.calamp.connect.vs.model.VideoEnrichedEvents] for GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}], failedMessage=GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}]
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload ConsumerRecord consumerRecord){
LOGGER.debug("Consumed message!!!Full :"+consumerRecord);
System.out.println("Consumed Message!!! Actual object :"+((LinkedHashMap)consumerRecord.value()));
}
处理此问题的最简单方法是使用ByteArrayDeserializer
和ByteArrayJsonMessageConverter
bean(只需将其添加到应用程序上下文中,引导将其连接进来)。
这样,从JSON的转换被推迟到调用方法之前,以便我们知道目标类型是什么。
参见https://docs.spring.io/spring-kafka/docs/2.5.0.release/reference/html/#messaging-message-conversion
问题内容: 我有一个接口,该方法需要一个数组: 我正在使用Mockito嘲笑此接口,我想断言该接口已被调用,但是我不想验证传递了什么参数-“无关”。 如何使用通用方法代替编写以下代码? 问题答案: 从Java 8开始,您可以使用无参数方法,并且类型参数将由编译器推断: 说明 Java 8中的新事物是表达式的 目标类型 将用于推断其子表达式的类型参数。在Java 8之前,仅在大多数情况下用于类型参数
问题内容: 我在路由方面遇到了一些麻烦。 我正在使用CMS,需要两条主要路线。和。该控制器用于路由和控制应该用于任何东西比其他。然后,从控制器中,我将解析URL并显示正确的内容。 这就是我所拥有的: 第一条路线有效,但第二条路线无效。我稍微玩了一下,看来如果我不带问号使用,只有在之后放一些东西,它才能起作用。如果我 确实 把问号放在那儿,那根本不起作用。 我希望以下所有路线都转到view @ in
本文向大家介绍python中map、any、all函数用法分析,包括了python中map、any、all函数用法分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python中map、any、all函数用法。分享给大家供大家参考。具体分析如下: 最近想学python,就一直比较关注python,昨天在python吧看到有个帖子提问怎么在python中怎么判断密码是否符合规范,回帖中有很
如果我有一个主题,它有5个分区,然后我有一个服务消耗这5个分区。然后在consumer,我轮询并返回一个ConsumerRecords数组。 每个单独的ConsumerRecord是否可以来自这5个分区中的任何一个?
这是我的简单代码 这条线呢 重定向到taskOKAction,但它只允许我通过URL发送参数(?task=123)。 我需要将object$task发送到taskOKAction,以便在屏幕上打印用户在表单中键入的内容。 我该怎么做?在询问好的解决方案是存储表单中的数据(例如,在数据库或文件中)并只在URL中传递对象ID的参数之前,我已经对stackoverflow发了火。我认为这是一个很好的解决