当前位置: 首页 > 知识库问答 >
问题:

如何阻止Apache Flink CEP模式?

佴普松
2023-03-14

请帮帮我,我有两个问题:
我从Apache Kafka json-messages读到(然后我有步骤:反序列化到POJO、筛选器、keyBy...)

  1. 使用哪个更好:KeyedProcessFunction(带有状态、计时器、if-else逻辑块)还是Flink CEP模式库?

我可以检查KeyedProcessFunction中的输入序列(检查state,if-else blocks,out.collect(...),state.clear()...你会理解我的),我还可以使用带有条件和量化器的Flink CEP库。

例如:
我有输入序列:A1,(无事件1分钟)A2,(无事件5分钟)AK3,(无事件1分钟)AK4,(无事件5分钟以上)A5。(在A1和A5之间可能有很多事件)
我想发送输出:A1,A3,A5。
第一个事件,如果下一个事件发生在前一个事件之后少于5分钟,它将不发送到输出;如果下一个事件发生在前一个事件之后超过5分钟,它将发送到输出。
我应该向我的模式添加什么???

Pattern<Event, ?> pattern = Pattern.
<Event>begin("start")
.where(new SimpleCondition<Event>(){
 public boolean filter(Event event){
return event.getName().contains("A");
}
}).within(Time.minutes(5));

共有1个答案

傅志诚
2023-03-14

虽然乍一看,这个特定的示例作为KeyedProcessFunction实现起来似乎相当微不足道,但如果消息可以无序到达,肯定会产生一些复杂性。那么你可能会被愚弄,以为会有很大的差距,而实际上根本没有。

但是,如果您想要一个简单的、开箱即用的现成解决方案,这个特定的示例非常适合会话窗口。

对于CEP,我认为一个可行的解决方案会有这样的味道:您正在寻找一个a(称为A1)的序列,然后紧接着另一个a(称为A2),其中(A2.timestamp-a1.timestamp)>=5分钟。当找到匹配时,发射A1并使匹配引擎前进,使A2成为新的A1。(方便的是,CEP对输入流进行预排序,因此您不必担心会出现混乱。)

 类似资料:
  • 问题内容: Ajax使用回调,因为它是同步的。 我希望对远程URL块的调用直到出现一些答案为止 ,就像在Ajax中一样,但是没有异步部分,或者我要说要进行JAX调用。 是否有任何技术可以使以下事情发生(使用JQuery)(…使用JQuery或其他解决方案): 我只是想知道-想学习。 实际上,有时会阻塞直到回复合适为止。我并不是说要浏览器阻止,而只是脚本运行时。 问题答案: 您可以在使用jQuery

  • Stroustrup C++第4版第796页指出 “如果的条件计算为,则完全忽略它所在的整个函数声明。”和“...我们不申报任何东西。”。 我也读过这个建议的线程,在这个线程中,SFINAE只有在模板参数的参数推导中的替换使构造格式不正确时才起作用。

  • 问题内容: 是否有可能做出无法逃脱和?我目前得到: 但我正在寻找这样的事情: 问题答案: 从Go 1.7开始,您仍然 无法使用json.Marshal()做到​​这一点 。json.Marshal的源代码显示: json.Marshal总是这样做的原因是: 字符串值编码为强制转换为有效UTF-8的JSON字符串,用Unicode替换符文替换无效字节。尖括号“ <”和“>”转义为“ \ u003c”

  • 问题内容: 是否可以使用angularjs拦截器阻止请求? 问题答案: 在1.1.5及更高版本中,您可以使用配置对象的’timeout’属性。 从文档中: 超时– {number | Promise} –超时(以毫秒为单位),或承诺应在解决后中止请求。 简单的例子:

  • 我正在做一个正在改造/现代化的项目,其中有一个小的RMI部分,不幸的是,我以前从未与RMI合作过。 我不明白的一件事是,为什么在我最初创建RMI客户端之后,它一直调用其自定义SocketFactory构造函数。似乎每5分钟我就会看到一次调用构造函数的输出,即使客户端和服务器之间没有通信。 连接完成后,我应该做什么来清理和停止任何线程以进行持久化? 我在检查UnicastRemoteObject,也

  • 问题内容: 强制保持Node.js进程运行的最佳方法是什么,即使其事件循环不为空,从而防止进程终止?我能想到的最好的解决方案是: 如果您将间隔时间保持足够长,它将使间隔运行,而不会引起过多干扰。 有更好的方法吗? 问题的长版 我有一个使用Edge.js来注册回调函数的Node.js脚本,以便可以从.NET中的DLL内部对其进行调用。每秒将调用此功能1次,发送一个应打印到控制台的简单序列号。 Edg