当前位置: 首页 > 工具软件 > phaser-ce > 使用案例 >

CEP简介

顾淳
2023-12-01

1、什么是CEP

企业产生大量实时业务数据(交易数据、业务行为数据),需要从大量数据中挖掘出有价值的数据,如营销活动中是否有薅羊毛造成资产损失的,如果有风险需要进一步分析进行干预。
这样的场景,传统的关系型数据库无法对价值数据进行及时处理。

基于此,CEP(Complex Event Processing)复杂事件处理就出现了。CEP将发生的任何事情当作一个Event,比如用户进行一次登陆/注册/支付/交易等都认为是一个事件。这些事件共同特征是数量规模大、数据能源源不断的产生且是无边界的。

出现CEP的目的是通过定义规则,从这些大量的事件数据中找出有意的事件或者将事件组合成有业务含义的复合事件,从而得出结论,再进行下一步的处理。

1.1、使用场景

Some typical examples of applications are:

  • Business process management and automation (process monitoring, BAM, reporting exceptions, operational intelligence)
  • Finance (algorithmic trading, fraud detection, risk management)
  • Network and application monitoring (intrusion detection, SLA monitoring)
  • Sensor network applications (RFID reading, scheduling and control of fabrication lines, air traffic)

CEP适合处理大量的、多个数据源的实时数据,并对这些实时数据做处理得出实时的结果

参考:https://blog.csdn.net/yw1441776254/article/details/103238509

2、实现CEP的框架

2.1 Flink CEP

利用Flink CEP,即Flink 的复杂事件处理库,用户可以快速检测无尽数据流中的复杂模式。
https://www.jianshu.com/p/d4d7edc86497?utm_source=oschina-app

2.2 Esper

Esper是一个历史悠久的纯Java开源复杂事件和事件流引擎,可以监测事件流,并在特定事件发生时触发某些动作。
Esper性能良好,能够实时或者接近实时处理事件(或消息),具有高吞吐量、低响应时间和支持复杂计算等特点.
Esper引擎可以使用类似于SQL的EPL语句来构建处理模型,处理每秒几万到几十万的实时数据的查询统计。它的工作有点像倒过来的关系型数据库,它不需要像数据库那样存储数据,而是先构建查询语句,引擎依据这些处理模型实时的输出符合的结果。而关系型数据要查询语句提交后才会输出结果。

Esper缺点主要是Esper统计分析的中间数据全部是存储在内存中,不能跨服务器,只能单机部署,内存有限,存在单点故障,由于全内存操作,系统重启后中间数据就会丢失无法恢复。Esper的这些缺点风控系统都可以接受,对风控系统没有实质的影响。

EPL是Esper提供的类SQL的查询语言,事件流相当于数据库的table是用于查询的数据来源,事件相当于事件的行事件数据。

2.2.1 EPL 语法

  • 统计窗口
    win:length(size)//攒够size条数据后触发UpdateListener()函数。滑动窗口,攒满之后新来一个移除一个,并触发。
    win:length_batch(size) //攒够size条数据后触发,并清空队列。再攒满了再触发。
    win:time(time period)//第一次触发在period秒后,然后每一秒触发一次。
    win:time_batch(time period) //每period秒触发一次。
    win:keepall()//无参数,记录所有进入的数据,除非使用delete操作,才能从窗口移出数据。
    std:unique(criteria)//对不同的criteria[例如:id]保留其最近的一条事件

  • 连接
    当老师的id和学生的id相同时,查询学生的姓名和老师的姓名

select s.name, t.name from Student.win:time(10) as s inner join  Teacher.win:time(10) as t on s.id=t.id

使用具体的例子来讲解如何使用Esper:
https://blog.csdn.net/liangyihuai/article/details/79705357

select * from TemperatureSensorEvent
match_recognize (
  partition by device
  measures A.id as a_id, B.id as b_id, A.temp as a_temp, B.temp as b_temp
  pattern (A B)
  define 
    B as Math.abs(B.temp - A.temp) >= 10
)
https://esper.espertech.com/release-5.1.0/esper-reference/html/match-recognize.html

2.2.2 Esper事件处理

UpdaterListener是Esper提供的一个接口,用于监听某个EPL在引擎中的运行情况,即事件进入并产生结果后会通知UpdateListener。

 public interface UpdateListener  
    {  
        public void update(EventBean[] newEvents, EventBean[] oldEvents);  
    } 
 类似资料: