我正在创建一个简单的Kafka Streaming应用程序。我的Producer正在为一个主题生成protobuf序列化消息,我在Kafka Streaming应用程序中使用该主题来处理消费者消息。我正在尝试使用valueSerde:io.confluent.kafka.streams.serdes.protobuf反序列化消息。KafkaProtobufSerde
在我的应用程序中。yml文件。我发现以下错误。
错误:
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3]
我的应用程序。yml
配置文件:
spring:
cloud:
function:
definition: process
stream:
bindings:
process-in-0:
consumer:
max-attempts: 3
valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
back-off-initial-interval: 100
retryable-exceptions:
javax:
validation:
ValidationException: false
destination: MYINPUT-TOPIC
group: consumer-group
concurrency: 2
kafka:
streams:
binder:
brokers: localhost:9092
schema-registry-u-r-l: http://localhost:8081
auto-offset-reset: "earliest"
configuration:
commit-interval-ms: 100
在日志中打印实际反序列化消息的处理方法:
@Component
@Slf4j
public class ProcessStream {
//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages.
//I'm getting byte Aarry on this method when I'm debugging it.
@Bean
public Consumer<KStream<String, Timer>> process() {
return (InputStream) -> {
InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
};
}
}
请帮我解决这个问题。如何在Kafka Stream中使用protobuf反序列化消息?
你需要使用一个 protobuf Serde 来包装一个合适的 protobuf 反序列化器。Confluent schema registry 提供了一个 protobuf Serde 实现。有关详细信息,请参阅此处。我还没有测试过这个特定的实现,但看起来应该可以工作。如果您正在使用它(或自定义的 protobuf serde 实现),那么您只需提供该类型的 bean 即可在 Spring Cloud Stream 应用程序中注册它。见下文。
@Bean
public KafkaProtobufSerde<Timer> kafkaProtobufSerde() {
}
Spring Cloud Stream中的Kafka Streams绑定器将检测此bean并与您的消费者类型匹配。
根据以下评论中共享的示例应用程序进行更新
在您的配置中进行以下更改后,我能够运行您的示例应用程序:
spring:
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
group: consumer-group
concurrency: 1
headerMode: none
destination: MESSAGING-TIMER-EXAMPLE
kafka:
streams:
binder:
configuration:
schema.registry.url: http://localhost:8081
您需要< code > Kafka . streams . binder 前缀。在做出更改后,我能够启动应用程序而不会出现任何错误。我在启动日志中看到了这些。
2021-07-29 15:10:42.995 INFO 65137 --- [ main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Key Serde used for process-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
2021-07-29 15:10:42.996 INFO 65137 --- [ main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Value Serde used for process-in-0: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
...
...
2021-07-29 15:10:43.629 INFO 65137 --- [ main] c.j.ListenerMessagingStreamsApplication : Started ListenerMessagingStreamsApplication in 1.836 seconds (JVM running for 2.305)
我注意到您将yaml配置文件命名为< code > application-local . yml 。如果您正在使用这个属性,那么请确保您在运行时设置了这个属性。否则,将该文件重命名为< code>application.yml。
问题内容: 关于如何解决此错误的任何想法?我在Hibernate中使用Spring JPA。下面的必要详细信息。 实体类别1: 用户的实体类: 和stacktrace: stacktrace非常大。我将下面的整个stacktrace粘贴以供参考,并在此处粘贴快速TL; DR: 完整的堆栈跟踪如下: 更新: 我将Ways实体类更新为以下内容,但仍然出现错误: 问题答案: 当无法反序列化某些内容时,就
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢
我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?
你能帮我理解这个摘录,从Kafka流留档: 在可能正在处理多个主题分区的流任务中,如果用户将应用程序配置为不等待所有分区都包含一些缓冲数据,并从具有最小时间戳的分区中选取以处理下一条记录,则稍后在为其他主题分区提取某些记录时,它们的时间戳可能比从另一个主题分区获取的已处理记录小。 我不明白其中的逻辑:如果您选择具有最小时间戳的分区,为什么以后会获得较小的时间戳?