我使用Confluent-3.2.1作为Kafka拖缆。我正在尝试聚合我的KGroupedStream
Map<String, Object> serdeProps = new HashMap<>();
final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewSerializer.configure(serdeProps, false);
final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewDeserializer.configure(serdeProps, false);
final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);`
流媒体代码为
KGroupedStream<String, MyClass1> msg_grp = message
.groupByKey();
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp
//.reduce(new Reduced(), arg1, arg2);
.aggregate(new Init(),
new Aggr(),
TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),
pageViewSerde,
"MySample_out");
当我运行代码时,我发现了错误:
[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
问题出在message.groupByKey();
上。它为您的自定义类MyClass1
使用String Serde。请为MyClass1
实现自定义序列化器和反序列化器,并在重载版本的groupKey ByKey
-https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,org.apache.kafka.common.serialization.Serde)中使用它们
Appium 的 iOS 版本的后端用的是Facebook's WebDriverAgent。该后端是基于苹果公司的 XCTest 框架,所以也有所有XCTest 框架已知的问题。其中有些问题我们正在设法解决,有一些在现阶段可能无法解决。本文中描述的方法已经能够使您完全掌握在设备上如何构建、管理和运行WDA。通过这种方式,您可以在CI环境中对您的自动化测试进行微调,并使其在长期运行的情况下更加稳定
我写了一个小的测试程序来创建 自定义自签名CA证书#1 创建由该CA根证书#1颁发的服务器证书#2 创建具有证书#2的服务器 创建一个RootCA指向证书1的客户端 客户端尝试连接到服务器,但出现错误: 得到“https://localhost:2000:x509:由未知颁发机构签署的证书(可能是因为尝试验证候选颁发机构证书“测试ca”时“x509:Ed25519验证失败”) 我知道这方面的例子有
预先定义的虚拟服务器 FreeRADIUS包括站点可用子目录下的虚拟服务器。有些可以按原样使用,而有些则是用于特殊要求的模板。以下是一些虚拟服务器: buffered-sql:此虚拟服务器用于克服大型SQL数据库(type = detail)的速度限制。 copy-acct-to-home-server:此虚拟服务器可用作模板,用于在两个位置记录一个计费请求(type = detail)。 coa
注:内容翻译自官网文档 Language Guide (proto3) 中的 Defining Services 一节 如果想在RPC (Remote Procedure Call) 系统中使用消息类型, 可以在.proto文件中定义RPC服务接口, 然后protocol buffer编译器会生成所选语言的服务接口代码和桩(stubs). 因此, 例如, 如果想定义一个RPC服务,带一个方法处理S
身份验证和授权工作正常。但是在成功登录后,它不会将我重定向到客户端,而是进一步打开一些.js文件的源代码。而上一个(没有自定义登录表单loginPage(“/login”))它成功地将我重定向到最后一个单击的页面(客户端),该页面需要进行身份验证。 我的服务器端代码如下: 授权服务器 资源服务器 我的安全配置