当前位置: 首页 > 知识库问答 >
问题:

Apache Flink模式检测未找到任何匹配项

羊舌庆
2023-03-14

我正在尝试使用Apache Flink CEP(复杂事件处理)库捕获模式。我从以下结构开始,我希望看到id[1,2]和[3,4]的2个匹配项。但是我没有看到任何结果。

public class StreamingJob {

    private static Logger logger = LoggerFactory.getLogger(StreamingJob.class);

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        ArrayList <Event> strings = new ArrayList <>();
        strings.add(new Event(0L, "room", 9));
        strings.add(new Event(1L, "room", 10));
        strings.add(new Event(2L, "garden", 11));
        strings.add(new Event(3L, "room", 12));
        strings.add(new Event(4L, "garden", 13));
        strings.add(new Event(5L, "room", 14));
        strings.add(new Event(6L, "room", 15));


        KeyedStream <Event, String> source = env.fromCollection(strings).keyBy(Event::getName);

        source.print("###-source");

        Pattern <Event, ?> pattern = Pattern. <Event>begin("room")
                .where(new SimpleCondition <Event>() {
                    @Override
                    public boolean filter(Event value) {
                        logger.info("### value: {}", value);
                        return value.getName().equals("room");
                    }
                })
                .next("garden")
                .where(new SimpleCondition <Event>() {
                    @Override
                    public boolean filter(Event value) {
                        logger.info("### value: {}", value);
                        return value.getName().equals("garden");
                    }
                });

        PatternStream <Event> patternStream = CEP.pattern(source, pattern);


        // process 
        DataStream <Alarm> result = patternStream.process(
                new PatternProcessFunction <Event, Alarm>() {
                    @Override
                    public void processMatch(
                            Map <String, List <Event>> pattern,
                            Context ctx,
                            Collector <Alarm> out) throws Exception {
                        logger.info("### pattern: {}", pattern);
                        logger.info("### ctx: {}", ctx);
                        out.collect(new Alarm(pattern.toString()));
                    }
                });

        result.print("###");

        // or select function
        patternStream.select(new PatternSelectFunction <Event, Alarm>() {
            @Override
            public Alarm select(Map <String, List <Event>> pattern) throws Exception {
                logger.info("###");
                return new Alarm(pattern.toString());
            }
        }).print("###");

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
}

source.print()方法正在打印源流,对于接收器,我尝试了进程选择方法,但都无法打印结果。此外,我的过滤器或匹配方法的日志根本没有打印出来。我的印象是过滤器功能甚至没有使用。Event警报对象是简单的pojos,如下所示:


public class Event implements Serializable {
    Long id;
    String name;
    Integer temperature;
    //...
}

public class Alarm implements Serializable {
    String text;
    // ...
}


我还尝试将其更改为过滤其他字段,例如,使用温度字段,我想捕获奇数-偶数数字序列,但仍然没有打印任何结果。

共有1个答案

林涵映
2023-03-14

CEP依赖于能够按时间戳对事件流进行排序。这要求您要么提供Watermark Strategy(如果您想使用事件时间戳),要么使用流转时长语义学指定您希望模式匹配按摄取顺序完成。

您可以通过进行以下小改动来实现后者:

PatternStream <Event> patternStream = 
  CEP.pattern(source, pattern).inProcessingTime();
 类似资料:
  • 我用java在netbeans中编程,一切都很好,直到突然测试不再运行。我怀疑这是一个编程错误,因为我正在使用netbeans生成所有测试,而且这个问题似乎在各个项目中持续存在。我甚至尝试过重新安装Netbeans。这是我右键单击并按test file时的输出。 我假设这是一个渐变问题,这里是默认渐变。在新的Netbeans项目上构建文件。 它快把我逼疯了。有变通办法吗?我可以对gradle.bu

  • 下面的代码(代码A)产生正确的输出(匹配) 但是,下面的代码(代码B)产生错误的输出(不匹配)

  • 问题内容: 我正在尝试运行python urllib2脚本并收到此错误: InsecurePlatformWarning:真正的SSLContext对象不可用。这会阻止urllib3正确配置SSL,并可能导致某些SSL连接失败。有关更多信息,请参见https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarn

  • 问题内容: 从那时起,我一直在TutorialsPoint上查看代码,此后一直困扰着我……看一下这段代码: 此代码成功打印: 但是根据正则表达式,为什么它不返回其他可能的结果,例如: 要么 如果此代码不适合这样做,那么我该如何编写一个可以找到所有可能匹配项的代码? 问题答案: 这是因为的贪婪,随之而来的是回溯。 字串: 正则表达式: 我们都知道那是贪婪的,并且尽可能匹配所有字符。因此,第一个匹配所

  • 一、模式匹配 Scala 支持模式匹配机制,可以代替 swith 语句、执行类型检查、以及支持析构表达式等。 1.1 更好的swith Scala 不支持 swith,可以使用模式匹配 match...case 语法代替。但是 match 语句与 Java 中的 switch 有以下三点不同: Scala 中的 case 语句支持任何类型;而 Java 中 case 语句仅支持整型、枚举和字符串常

  • 主机权限和 内容脚本匹配 是基于匹配模式定义的一组 URL。匹配模式本质上是一个以允许的 schema(http,https,file 或ftp 开头)的URL,并且可以包含 “*” 字符。特殊模式 < all_urls > 匹配以允许的 schema 开头的任何 URL。 每个模式包含 3 个部分: schema - 例如,http 或file 或 * 注意:对文件 URL 的访问不是自动的。用