我在测试新的Flink 1.0.0功能时遇到了一些问题。我一直在修补CEP,但还没有运行简单的演示代码:
val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);
class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
pattern.get("start")
}
}
val alerts = patternStream.select(new MyPatternSelectFunction())
代码编译良好,maven不显示任何警告。TraFFEvent是一个包含几个简单字段的类,stream是该类的Scala DataStream。当代码在Flink上运行时,错误会出现。它运行一秒钟,然后代码退出并显示此错误消息:
该程序以以下例外结束:
Input mismatch: Tuple type expected.
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
com.demo.DemoTraffic$.main(DemoTraffic.scala:311)
我试图通过构建这样一个静态类将功能移到Java上(也许从Scala调用API时会出现一些奇怪的问题):
public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
@Override
public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
return pattern.get("start");
}
});
return rvalue;
}
但结果完全相同,并且在PatternStream中抛出相同的错误。选择线条。关于我可以尝试什么或我做错了什么,有什么提示吗?正如你所看到的,这个模式非常愚蠢,它只用于测试紫色。它只接受所有事件,并返回该事件作为响应。Flink是1.0.0,使用Scala 2.10版本。
谢谢
我假设TraFFEvent
是一个Scala case类。CEP库是为Flink的JavaAPI编写的,因此还不支持Scala case类。
作为一种变通方法,您可以将case类转换为普通的Scala类。
还有一张JIRA记录单,跟踪CEP Scala API的开发。
我生活在一场噩梦中,这是一场依赖性的噩梦;) 我正在测试一个使用Hibernate3的EJB应用程序。 我可以部署它: > 作为部署到嵌入式JBOSS 7()的Arquillian测试(EJB作为封装在EAR中的JAR)。它在那里不起作用。我尝试过各种依赖项集,但这是我确定需要的最小依赖项集:
我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。 提前道谢!
它打印出值的等效,这是因为这一行: 通过调用表示。 那么,如何使Hibernate相信是的实例? 我的枚举是由加载的。而由URLClassLoader加载,由另一个类加载器加载。
我正在尝试使用Spring数据执行一个IN查询。我的模型如下所示: 我的仓库界面是这样的: } 问题是当我尝试执行此代码
我已经回答了这个问题,但没有得到任何反馈。因此,我将尝试提出不同的问题。我想对我的搜索请求使用规范。但似乎无法访问规范或其他内容,因为它告诉我: 这个:带有@...似乎不对。 我只是试着遵循这个Spring官方教程:https://spring.io/blog/2011/04/26/advanced-spring-data-jpa-specifications-and-querydsl/ 或者来自