当前位置: 首页 > 知识库问答 >
问题:

Flink CEP-如果序列没有在一段时间内到达,而不考虑任何其他消息的到达,则发出警报

傅经业
2023-03-14

我编写了一个Flink CEP片段,该片段使用松弛的邻接度(followedby)检查状态模式(由ID键控)。其思想是,如果特定状态在指定时间内未在第一个状态之后到达,则发出警报。

这是有效的,但是如果有nomore消息到该流,则不会触发警报。但只有当带有某种随机状态的消息到达时,这部分才会被触发。

那么,我如何使它触发警报,即使没有消息到达这个流,当带有下一个序列的消息没有带时间到达时?

Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
            .where(new SimpleCondition<Transaction>() {

                private static final long serialVersionUID = 1L;

                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_1);
                }
            })
           .followedBy("end")
           .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_2);
                    return amlveri;
                }
            }).within(Time.seconds(15));

PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
            TypeInformation.of(Alert.class)) {};
    SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
            new PatternFlatTimeoutFunction<Transaction, Alert>() {

                @Override
                public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
                        throws Exception {
                    Transaction failedTrans = values.get("start").get(0);
                    arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
                }
            }, new PatternFlatSelectFunction<Transaction, Alert>() {

                @Override
                public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
                        throws Exception {
                       // do not do anything
                }
            });
    select.getSideOutput(timedOutPartialMatchesTag).print();

共有1个答案

秦鹏飞
2023-03-14

如果您正在使用事件时间,则invin()方法正在等待水印来触发事件时间计时器。但是如果没有事件到达,水印就不会前进(假设您正在使用BoundedOutoFordernesTimeStampExtractor这样的东西来生成水印)。

如果您需要在没有事件到达的情况下检测时间的流逝,那么就需要使用处理时间。您可以将timecharacteristic设置为处理时间,或者实现一个水印生成器,它使用处理时间计时器来人为地提前水印,尽管缺少事件。

 类似资料:
  • 我在java中有以下正则表达式- 但这仍然匹配“”。 整个代码-

  • 我们有一个Storm拓扑,其中我们配置了一个喷口和两个螺栓。Spout连续地从DB查询数据,并将其发送到first bolt进行某些处理。第一个bolt进行一些处理并将元组发送给第二个bolt,第二个bolt调用第三方web服务并发送数据。所以,经过一段时间后,最后一个bolt没有得到任何元组,如果我们重新启动拓扑,它工作得很好。这里只有最后一个螺栓有问题。其他喷口和第一螺栓运行良好,我不使用顶进

  • 我想列出所有ec2资源的标签(客户网关|dhcp选项|图像|实例|互联网网关|网络acl|网络接口|保留实例|路由表|安全组|快照|斑点实例-请求|子网|卷|vpc|vpn连接和vpn网关)。 下面的代码列出了我的ec2客户端的所有带有标记的资源: 问题 这里的问题是,只有在“实例”的情况下,不包括没有标记的实例这样的资源。如果ec2中有5个实例,3个有标签,2个没有标签,上面的代码将只列出这3个

  • 我刚开始学Kafka,Kafka-蟒蛇。在下面的代码中,我试图在消息到达时读取它们。但出于某种原因,消费者似乎要等到一定数量的消息积累后才能获取它们。 我最初以为是因为正在批量出版的制片人。当我运行“kafka-console-consumer--bootstrap-servers--topic”时,我可以看到发布后收到的每一条消息(就像在consumer控制台上看到的那样) 有人能指出用KafK

  • 问题内容: 因此,对于Java中的正则表达式,我想编写一个正则表达式,当且仅当模式前面没有某些字符时才匹配。例如: 如果bar不以foo开头,我想匹配。因此输出为: 我知道这可能是一个非常简单的问题。我正在尝试学习正则表达式,但与此同时我现在需要一些工作。 问题答案: 你想这样使用: 此处的()意思是“仅在此点之前没有“ x”。 有关更多信息,请参见正则表达式-环顾四周。 编辑:添加来捕获之前的字

  • 但是,如果我的Lambda不期望任何输入,它将自己转到SQS并拉出消息,有输入有意义吗?我是否可以让它无效,或者甚至完全使用其他方法签名(当然,在本例中不实现那个接口)?