我试图使用kafkastreams进行聚合,但得到的错误如下所示
这是我正在做的事情:
KGroupedStream<String, Long> countrywiseAmount = ......;
KTable<String, CountSum> countrywiseAverageSum = countrywiseAmount
.aggregate(new Initializer<CountSum>() {
@Override
public CountSum apply() {
return new CountSum();
}
}, new Aggregator<String,Long,CountSum>() {
@Override
public CountSum apply(String country, Long amount, CountSum sumByCountry) {
sumByCountry.setCountry(country.toString());
sumByCountry.setCount(sumByCountry.getCount()+1);
sumByCountry.setSum(sumByCountry.getSum()+amount);
return sumByCountry;
}
}, Materialized.with(stringSerde, countSumSerde));
我收到的错误如下。
由以下原因引起:A 序列化程序(密钥:类型:a 序列化程序(密钥:类型:网站名称:通用序列化程序 / 值:在流配置中更改默认的 Serdes 或通过方法参数提供正确的 Serdes。在 org.apache.kafka.流.处理器.内部.sinkNode.进程 (SinkNode.java:94) 在 org.apache.kafka.流.处理器.internals.处理器上下文前进(处理器上下文.java:201) 在 org.apache.kafka.streams.processor.internals.processorContextul.forward(处理器上下文.java:180) 在 org.apache.kafka.流媒体.处理器.内部.处理器上下文简单处理器(简单.java:133) 在在 org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.流媒体.processor.internals.processorNode.process(ProcessorNode.java:117) at org.apache.kafka.streams.processor.processor.internals.internals..java 处理器.java:180) at org.apache.kafka.流.处理器.内部.处理器上下文.java:180) at在组织.apache.kafka.streams.processor.internals.processorContextins.forward(处理器上下文过程.java:133) at org.apache.kafka.streams.kstream.internals.kStreamMapor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.processorNode.process(ProcessorNode.java:117) at org.apache.kafka.stream.processor.internals.ProcessorContextimpl.forward(ProcessorContextimpl.java:201)处理器网明.java:133) 在 org.java.apache.kaf.java ka.streams.processor.internals.process(流任务.java:363) ...5 更多 原因: java.lang.ClassCastException: org.apache.avro.util.Utf8 不能被转换为 java.lang.String 在 org.apache.kafache.common.serialization.字符串序列化(字符串序列化程序.java:28) 在 org.apache.kafka.common.serialization.serializer.serialize(序列化程序.java:62) 在 org.apache.kafache.流.处理器.internals.内部体.记录收集器发送(记录收集器.java:162) 在 org.apache.kafka.流.内部.记录收集器发送(记录收集器.java:103) at org.apache.kafka.stream.internals.sinkNode.process(SinkNode.java:89)
有线索吗?
KTable聚合逻辑没有问题,但是KGroupeStream的问题
FAQs in section [36]: [36.1]“序列化”是什么东东? [36.2] 如何选择最好的序列化技术? [36.3] 如何决定是要序列化为可读的(“文本”)还是不可读的(“二进制”)格式? or non-human-readable ("binary") format?") [36.4] 如何序列化/反序列化数字,字符,字符串等简单类型? [36.5] 如何读/写简单类型为可读的
上一小节我们学习了 Java 的输入输出流,有了这些前置知识点,我们就可以学习 Java 的序列化了。本小节将介绍什么是序列化、什么是反序列化、序列化有什么作用,Serializable 接口以及 Externalizable 接口,常用序列化工具介绍等内容。 1. 序列化与反序列化 序列化在计算机科学的数据处理中,是指将数据结构或对象状态转换成可取用格式,以留待后续在相同或另一台计算机环境中,能
问题内容: 我有一个公共类,该类实现了Serializable,并由其他多个类进行了扩展。只有那些子类曾经被序列化过-从来没有超类。 超类已定义了serialVersionUID。 我不确定是否重要,但是它没有标记为私有,而是仅具有默认保护-您可能会说它是受软件包保护的 但是,超类或任何子类均未实现readObject或writeObject,并且这些子类均未明确定义serialVersionUI
我的Spring Boot API使用camelCase,但我需要通过我的API将一些请求代理到使用snake_case的第三方API。是否可以将Jackson配置为从snake_case反序列化第三方响应,然后将其序列化回camelCase到我的前端? 所需功能的逐步示例: 示例对象: 我调用我的API API调用第三方 第三方返回 我的API将其反序列化为 我的API序列化对象并返回 现在我使
下面的代码再现了这个问题: 上面的代码不做其他注册“自定义”序列化程序的事情(只是委托回原始序列化程序),但它生成的JSON没有null属性: {“第一个”:“鲍勃”,“最后一个”:“巴克”} 我读过许多看似相关的SO文章,但没有一篇能让我找到解决方案。我尝试在序列化时显式地将映射器设置为,但没有成功。 我唯一的线索是JavaDoc for JsonSerializer中的一条注释: 注意:永远不
本文向大家介绍什么是序列化与反序列化?相关面试题,主要包含被问及什么是序列化与反序列化?时的应答技巧和注意事项,需要的朋友参考一下 序列化:将对象状态转换为可保持或传输的格式的过程。将对象实例的字段及类的名称转换成字节流,然后把字节流写入数据流 反序列化:将流转换为对象。 这两个过程结合起来,可以轻松地存储和传输数据。