当前位置: 首页 > 知识库问答 >
问题:

如果在Apache flink中5秒内5次发现相同数据,则生成CEP事件

督辉
2023-03-14

如果我们发现字符串消息在5秒内连续以字符“a”开头5次,我需要生成CEP事件。

为此,我编写了一个类CEPCharEventPublisher。java,将字符串消息(如下发布的消息)发布到kafka主题“charEvent”

已发布消息:

b; date- 2019-06-27 09:05:09.605
a; date- 2019-06-27 09:05:10.160
c; date- 2019-06-27 09:05:10.661
b; date- 2019-06-27 09:05:11.162
c; date- 2019-06-27 09:05:11.669
b; date- 2019-06-27 09:05:12.175
b; date- 2019-06-27 09:05:12.675
b; date- 2019-06-27 09:05:13.176
a; date- 2019-06-27 09:05:13.676
c; date- 2019-06-27 09:05:14.176
b; date- 2019-06-27 09:05:14.677
b; date- 2019-06-27 09:05:15.177
b; date- 2019-06-27 09:05:15.678
c; date- 2019-06-27 09:05:16.178
a; date- 2019-06-27 09:05:16.679
c; date- 2019-06-27 09:05:17.179
c; date- 2019-06-27 09:05:17.680
c; date- 2019-06-27 09:05:18.180
c; date- 2019-06-27 09:05:18.681
c; date- 2019-06-27 09:05:19.181
c; date- 2019-06-27 09:05:19.681
a; date- 2019-06-27 09:05:20.182
c; date- 2019-06-27 09:05:20.682
b; date- 2019-06-27 09:05:21.182
c; date- 2019-06-27 09:05:21.682
b; date- 2019-06-27 09:05:22.183
a; date- 2019-06-27 09:05:22.683
b; date- 2019-06-27 09:05:23.184
a; date- 2019-06-27 09:05:23.684
c; date- 2019-06-27 09:05:24.184
b; date- 2019-06-27 09:05:24.685
b; date- 2019-06-27 09:05:25.186
c; date- 2019-06-27 09:05:25.687
b; date- 2019-06-27 09:05:26.187
a; date- 2019-06-27 09:05:26.687
a; date- 2019-06-27 09:05:27.188
a; date- 2019-06-27 09:05:27.688
b; date- 2019-06-27 09:05:28.188
b; date- 2019-06-27 09:05:28.688

现在我有一个消费者CEPCharEventConsumer.java它将读取来自Kafka主题charEvent的消息并过滤以字符“a”开头的消息。

然后,我编写了以下模式来生成CEP事件/警报,同时我们发现连续5条消息,该消息在5秒内以字符“a”开头。

Pattern<String, String> pattern = Pattern.<String> begin("start")
                .times(5).greedy().where(new SimpleCondition<String>() {
                    private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.split(";")[0].equals("a");
                    }
                }).within(Time.seconds(5));

打印CEPCharEventConsumer接收到的以字符“a”开头的消息。java在下面。

2> a; date- 2019-06-27 09:05:10.160
1> a; date- 2019-06-27 09:05:13.676
3> a; date- 2019-06-27 09:05:16.679
2> a; date- 2019-06-27 09:05:20.182
3> a; date- 2019-06-27 09:05:22.683
1> a; date- 2019-06-27 09:05:23.684
3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199

1> a; date- 2019-06-27 09:05:33.703
1> a; date- 2019-06-27 09:05:35.203
3> a; date- 2019-06-27 09:05:36.705
2> a; date- 2019-06-27 09:05:38.207
1> a; date- 2019-06-27 09:05:39.709
2> a; date- 2019-06-27 09:05:40.209
3> a; date- 2019-06-27 09:05:40.728

打印的警报消息:

4> Found: a; date- 2019-06-27 09:05:26.687

在上述消息中,“发现:a;日期-2019-06-27 09:05:26.687”是警报消息。

我无法理解Flink是如何在5秒内计算出连续的5条消息的。我觉得有点不对劲。

我正在附加源代码的GIT URL(flink cep char事件)。谁能按照我的要求把它改正一下吗。

共有1个答案

松钊
2023-03-14

基于CEP的应用程序似乎正确报告了以下5条消息

3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199

发生在5秒间隔内。

在您的PatternProcessFunction中的processMatch方法被传递一个映射

所以生成一个报告,给出最后一个匹配事件的时间,而不是第一个匹配事件的时间,改变

String start = match.get("start").get(0);
out.collect("Found: " + start);

String last = match.get("start").get(4);
out.collect("Found: " + last);
 类似资料:
  • 当且仅当相同的项目在最后x毫秒内发射时,我想防止发射发生。我已经查看了油门和防抖操作符,但我不确定它们是否可以在这里帮助我。我可以使用其他操作符吗,或者我可以以某种方式组合它们吗?

  • 我们有不稳定的测试环境,一些测试偶尔会因为环境问题而失败。 我希望每个测试最多执行5次,如果5次执行中至少有3次成功,就被认为是成功的。 如何使用TestNG实现这一点?

  • 问题内容: 下面的代码旨在生成间隔为[1,100]的五个伪随机数的列表。我为with设置了种子,它以unix时间返回系统时间。当我使用Microsoft Visual Studio 2013在Windows 7上编译并运行该程序时,它会按预期运行(请参阅下文)。但是,当我在Arch Linux中使用g ++编译器执行此操作时,它的行为就很奇怪。 在Linux中,每次将生成5个数字。每次执行后4个数

  • 如果我想为一些oauth的东西做一个函数,我可以在视图或控制器中使用它(想想rails,你在应用程序控制器中做)。 我阅读的所有内容都说明要创建一个helpers文件夹并在那里添加函数,然后您可以这样做这是正确的方法吗? 创建可用于刀片模板和控制器的辅助函数的“laravel方式”是什么?

  • 我想知道什么是电池效率最高的方式发送准确的位置更新到服务器/防火墙每5秒,即使应用程序关闭或手机重新启动。我尝试使用alarmmanager.setrepeating和android.intent.action.boot_completed接收器- 使用post一个延迟消息或runnable到一个处理程序不是一个可靠的解决方案,因为一旦应用程序被删除,位置更新就会停止。 是否有可靠的方法每5秒向服

  • 问题内容: 我正在查看“日期” 文档,并试图弄清楚如何立即表达+ 5秒。这是一些伪代码: 问题答案: 日期几乎完全不推荐使用,由于向后兼容的原因,该日期仍然存在。如果您需要设置特定日期或进行日期算术,请使用Calendar: