当前位置: 首页 > 知识库问答 >
问题:

“输入不匹配:预期元组类型”当尝试在PatternStream上选择时

通飞尘
2023-03-14

我在测试新的Flink 1.0.0功能时遇到了一些问题。我一直在修补CEP,但还没有运行简单的演示代码:

val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);

class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
    override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
        pattern.get("start")
    }
}
val alerts = patternStream.select(new MyPatternSelectFunction())

代码编译良好,maven不显示任何警告。TraFFEvent是一个包含几个简单字段的类,stream是该类的Scala DataStream。当代码在Flink上运行时,错误会出现。它运行一秒钟,然后代码退出并显示此错误消息:

该程序以以下例外结束:

  Input mismatch: Tuple type expected.
            org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
            org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
            org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
            com.demo.DemoTraffic$.main(DemoTraffic.scala:311)

我试图通过构建这样一个静态类将功能移到Java上(也许从Scala调用API时会出现一些奇怪的问题):

public static DataStream<DemoTraffic.trafficEvent>  getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
  Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
  PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
  DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
    @Override
    public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
      return pattern.get("start");
    }
  });
  return rvalue;
}

但结果完全相同,并且在PatternStream中抛出相同的错误。选择线条。关于我可以尝试什么或我做错了什么,有什么提示吗?正如你所看到的,这个模式非常愚蠢,它只用于测试紫色。它只接受所有事件,并返回该事件作为响应。Flink是1.0.0,使用Scala 2.10版本。

谢谢

共有1个答案

何涵育
2023-03-14

我假设TraFFEvent是一个Scala case类。CEP库是为Flink的JavaAPI编写的,因此还不支持Scala case类。

作为一种变通方法,您可以将case类转换为普通的Scala类。

还有一张JIRA记录单,跟踪CEP Scala API的开发。

 类似资料: