我有一个Spring启动应用程序,它定义了:
(我一直在更改后缀以避免任何可能的混淆,并手动创建主题,因为否则我会收到警告,STREAM_TOPIC_IN_xxx=LEADER_NOT_AVAILABLE,流将在一分钟左右内无法运行。)
第一个侦听器和流似乎正在工作,但是当STREAM_OUT_TOPIC上的侦听器尝试反序列化消息时,我得到以下异常。我正在为流中的serde提供Produce.with。我需要做什么才能让侦听器知道要反序列化的类型?
日志
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer] Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367 ERROR [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
以下是配置:
REST (Spring MVC):
@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
logger.debug("Sending a Kafka Message");
return gr;
}
Kafka配置(Spring-kafka):
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}
应用程序.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
所以我也面临着同样的问题。
我就这样修好了
您必须将以下属性设置为您试图取消对其进行分类的类
spring.json.value.default.type=com.something.model.TransactionEventPayload
我将KafkaListener的属性设置为:
@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
“回答”我自己的问题很大程度上是为了巩固@GaryRussell评论中的信息,但基本上,他提供了最好的答案。简而言之,我做到了以下几点:
另一件事:默认情况下,在使用spring-boot自动配置时,简单地添加messageConverter也会将其添加到自动配置的kafkaTemplate中。当调用kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN,“1”,问候语)
时,这似乎不是问题,但我认为如果使用send(消息)可能会出现问题。
下面是一个工作配置,在其中,我以最小的配置获得了预期的消息
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
KafkaConfig:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); }
...
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
// logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(@Payload Greeting gr,
ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
//logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent());
logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition,
key);
logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
record.topic(), record.partition(), record.key());
logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("Got an error {}", e.getMessage());
return "some info about the failure";
};
}
消息的输出是:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World!
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
请参阅文档。
明确地...
<code>JsonDeserializer。VALUE_DEFAULT_TYPE:如果不存在标头信息,则值反序列化的回退类型。
It's spring.json.value.default.type
您还可以设置< code > spring . JSON . use . type . headers (默认为true)来阻止查找头。
反序列化器自动信任默认类型的包,因此无需在那里添加它。
编辑
但是,另请参阅Spring消息消息转换。
使用<code>BytesDeserializer</code>和<code>BytesJsonMessageConverter</code>,框架将传递方法参数类型作为转换的目标。
我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪
我正在编写一个web api,它有一个post方法来接受从UI上传的文件。 然后我通过邮递员向上面的行动发出请求。我将content-type头设置为multipart/form-data,但在操作执行期间发生错误。以下是错误消息正文: “提供的'HttpContent'实例无效。它没有带有'boundary'参数的'multipart'内容类型标头。\r\n参数名称:Content” 有人能帮我
我有一个枚举类角色类型 在我的实体类中,我有枚举集合的以下映射。这是代码: 我将这个用户实体类转换为Kotlin,下面是代码: 转换后,hibernate抛出以下异常: 它以前在Java工作得很好。 我还尝试在中添加,如下所示: 但这也是另一个例外。 注意:如果我将角色的修饰符从var改为val,它可以工作,但我需要它是一个可变类型。我不理解字段的易变性如何在hibernate中产生问题。 注意:
问题内容: 我正在一个Swift操场上玩,正在上一堂新课。由于某种原因,我不断收到这样的错误:类“没有成员类型”,其名称的常量前面定义了三行。这是代码: Xcode Beta6一直在倒数第二行给我一个错误,说“ DataModel.Type没有名为’myCalendar’的成员 尽管我认为这不会有所作为,但我尝试将myCalendar定义为var。 问题答案: 您无法初始化引用同一类的另一个实例属
为我在原始问题中提出的错误代码示例道歉。我试图把我的问题简单化,但结果却相当令人困惑。 我稍微修改了一下代码: 我得到了第一个错误: 所以我把最后一行改为: 然后在Orelse上弹出一个新的错误: Optional>类型中的方法orElse(Capture#4-of?extends Map.Entry)不适用于参数(AbstractMap.SimpleEntry) 我最初的问题是,我不知道如何为这
问题内容: 我查看了以前的文章,但似乎找不到解决此问题的方法。所有三行代码都给出“类型’Any’没有下标成员”错误。对此还很陌生,因此不胜感激。 问题答案: 的类型为。下标是一种特殊的函数,它使用将值括在大括号中的语法。此下标功能由实现。 因此,这里发生的是,作为开发人员的YOU知道这是一个,但是编译器却没有。它不会让您调用该函数,因为您试图在type的值上调用它并且未实现。为此,您必须告诉编译器