当前位置: 首页 > 知识库问答 >
问题:

Kafka Streamer:用户定义的“服务器”问题

冯哲彦
2023-03-14

我使用Confluent-3.2.1作为Kafka拖缆。我正在尝试聚合我的KGroupedStream

Map<String, Object> serdeProps = new HashMap<>();

final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewSerializer.configure(serdeProps, false);

final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewDeserializer.configure(serdeProps, false);

final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);`

流媒体代码为

KGroupedStream<String, MyClass1> msg_grp = message
            .groupByKey();  
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp
            //.reduce(new Reduced(), arg1, arg2);
            .aggregate(new Init(), 
                    new Aggr(), 
                    TimeWindows.of(TimeUnit.SECONDS.toMillis(5)), 
                    pageViewSerde, 
                    "MySample_out");

当我运行代码时,我发现了错误:

[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing:  (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

共有1个答案

慕朝明
2023-03-14

问题出在message.groupByKey();上。它为您的自定义类MyClass1使用String Serde。请为MyClass1实现自定义序列化器和反序列化器,并在重载版本的groupKey ByKey-https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,org.apache.kafka.common.serialization.Serde)中使用它们

 类似资料:
  • Appium 的 iOS 版本的后端用的是Facebook's WebDriverAgent。该后端是基于苹果公司的 XCTest 框架,所以也有所有XCTest 框架已知的问题。其中有些问题我们正在设法解决,有一些在现阶段可能无法解决。本文中描述的方法已经能够使您完全掌握在设备上如何构建、管理和运行WDA。通过这种方式,您可以在CI环境中对您的自动化测试进行微调,并使其在长期运行的情况下更加稳定

  • 我写了一个小的测试程序来创建 自定义自签名CA证书#1 创建由该CA根证书#1颁发的服务器证书#2 创建具有证书#2的服务器 创建一个RootCA指向证书1的客户端 客户端尝试连接到服务器,但出现错误: 得到“https://localhost:2000:x509:由未知颁发机构签署的证书(可能是因为尝试验证候选颁发机构证书“测试ca”时“x509:Ed25519验证失败”) 我知道这方面的例子有

  • 预先定义的虚拟服务器 FreeRADIUS包括站点可用子目录下的虚拟服务器。有些可以按原样使用,而有些则是用于特殊要求的模板。以下是一些虚拟服务器: buffered-sql:此虚拟服务器用于克服大型SQL数据库(type = detail)的速度限制。 copy-acct-to-home-server:此虚拟服务器可用作模板,用于在两个位置记录一个计费请求(type = detail)。 coa

  • 注:内容翻译自官网文档 Language Guide (proto3) 中的 Defining Services 一节 如果想在RPC (Remote Procedure Call) 系统中使用消息类型, 可以在.proto文件中定义RPC服务接口, 然后protocol buffer编译器会生成所选语言的服务接口代码和桩(stubs). 因此, 例如, 如果想定义一个RPC服务,带一个方法处理S

  • 我最近对尝试将我的石头-纸-剪刀游戏改编成一个多人友好的程序感兴趣,所以今天我决定查阅一个关于服务器的教程。似乎我正在精确地跟踪它(除了使用不同的IDE之外)。然而,有些地方出了问题,我不确定到底是什么,它对教程制造商来说很好。我查过EOFException,但并没有帮到我。 Youtube上的教程 [关于EOFException的文档](我在这里有一个链接,但我需要至少10个声誉才能发布两个以上