当前位置: 首页 > 工具软件 > Siddhi > 使用案例 >

Flink系列(四)-- Esper,Siddhi,Drools Fusion简单介绍以及应用

诸葛品
2023-12-01

 

原文地址(包含源码和图片):http://note.youdao.com/noteshare?id=c91f71fd16bedf7dfaac3b6fa663a243&sub=B79A8354FB1D4CB5BE44A1513C4F7A6C

 

一、简介

今天给大家分享的内容是FlinkCEP,中文意思就是复杂事件处理。

那么何为CEP呢? 听起来好像很复杂,实际上就是基于事件流进行数据处理,把要分析的数据抽象成事件,然后将数据发送到CEP引擎,引擎就会根据事件的输入和最初注册的处理模型,得到事件处理结果。

直白一点就是:对连续的传入事件进行模式匹配

 

二、应用场景

CEP应用场景具有几个共同而明显的特点:

  • 通常需要处理巨量的事件,但是只有少部分事件是真正关心的。
  • 通常在相关的事件之间有强烈的时间关系。
  • 个别事件通常是不重要的。系统关心相关事件的模式(patterns)和它们的关系

 

因此,CEP的处理引起了人们的极大兴趣,并在各种用例中得到了广泛的应用。 诸如股票市场趋势和信用卡欺诈检测等金融应用,检测仓库中的物品未被正确检出的盗窃,通过指定可疑用户行为的模式,CEP还可用于检测网络入侵等。以及各种故障的预测与告警等

 

三、几种CEP引擎简介

(1)Esper

1.何为Esper?

Esper是一个Java开发的事件流处理(ESP:Event Stream Processing)和复杂事件处理(CEP:Complex Event Processing)引擎。该引擎可应用于网络入侵探测,SLA监测,RFID读取,航空运输调控,金融方面(风险管理,欺诈探测)等领域。

Esper是纯Java开源复杂事件和事件流引擎,可以监测事件流,并在特定事件发生时触发某些动作。

2.Esper能做什么?优点有哪些?

  • 使用SQL的形式来处理实时数据,支持查询、聚合、连接、过滤等,非常容易上手
  • 多种形式(时间/长度)窗口统计事件数据,事件之间可以关联分析。
  • 高吞吐(10万事件/秒)低延时(毫秒级别)。
  • 核心只有一个jar包,易于扩展和发布。经常和实时分布式框架Storm一起使用。

 

3.简单示例

 

package test.ghf.cep.java.test02; import com.espertech.esper.client.*; import java.util.Random; import java.util.Date; public class exampleMain { /** * 股票代码,价格和时间戳 */ public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } } private static Random generator = new Random(); /** * 产生数据,并发送出去 * @param cepRT 用来发送事件 */ public static void GenerateRandomTick(EPRuntime cepRT) { double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick = new Tick(symbol, price, timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("触发告警: " + newData[0].getUnderlying()); } } public static void main(String[] args) { // 通知引擎它需要处理哪些对象 Configuration cepConfig = new Configuration(); // 注册Tick 到引擎(name -> class) cepConfig.addEventType("StockTick", Tick.class.getName()); // 初始化cep引擎 EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig); // 得到运行时环境 EPRuntime cepRT = cep.getEPRuntime(); // 注册一个EPL语句: 每当最近的2次数据的平均值大于6.0时,触发事件 EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); // 创建一个监听者并把它和我们的选择规则产生的事件关联起来 cepStatement.addListener(new CEPListener()); // 开始产生数据,并发送事件 for (int i = 0; i < 5; i++) { GenerateRandomTick(cepRT); } } }

 

(2)Siddhi

1.何为Siddhi?

Siddhi是一个轻量级的,简单的开源的复杂事件流程引擎。它使用类SQL的语言描述事件流任务,可以很好的支撑开发一个可扩展的,可配置的流式任务执行引擎。

传统设计之中,为了支持不同的告警规则类型,我们需要编写不同的业务逻辑代码,但是使用了Siddhi之后,我们只需要配置不同的流任务Siddhiql,即可以支持不同的告警业务。

 

2.为什么使用Siddhi?

 

快,优步每天处理200亿个事件(每秒30万个事件)。

它是轻量级的(<2MB),可以嵌入到Android和RaspberryPi中。

它有40多个Siddhi扩展

它被60多家公司使用,其中包括许多正在生产的财富500强公司。以下是一些例子:

WSO2使用Siddhi的目的如下:

在其产品(如WSO2流处理器)中提供流处理功能。

作为WSO2物联网服务器的edge分析库。

作为WSO2 API管理器节流的核心。

作为WSO2产品分析的核心。

优步使用Siddhi进行欺诈分析。

Apache Eagle使用Siddhi作为策略引擎。

基于Siddhi的解决方案分别在2014年、2015年、2016年和2017年的ACM DEBS大挑战流处理竞赛中进入决赛。

Siddhi是许多学术研究项目的基础,被引用超过60次。

 

3.简单示例

package com.test.ghhf.siddhi; import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.stream.input.InputHandler; import org.wso2.siddhi.core.stream.output.StreamCallback; /** * 按symbol 对事件进行分组,并计算滑动5秒时间窗口的价格和成交量总和 */ public class TimeWindowSample { public static void main(String[] args) throws InterruptedException { //创建一个Siddhi Manager SiddhiManager siddhiManager = new SiddhiManager(); //Siddhi Application String siddhiApp = "" + "define stream StockEventStream (symbol string, price float, volume long); " + //先定义流的格式 " " + "@info(name = 'query1') " + "from StockEventStream#window.time(5 sec) " + // 定义查询 "select symbol, sum(price) as price, sum(volume) as volume " + // 定义查询 "group by symbol " + // 定义查询 "insert into AggregateStockStream ;"; // 定义查询 //生成运行环境 SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp); //给AggregateStockStream 添加回调 -- 从流中检索匹配的事件,并输出 siddhiAppRuntime.addCallback("AggregateStockStream", new StreamCallback() { @Override public void receive(Event[] events) { for(int i = 0; i< events.length; i++){ System.out.println(events[i].toString()); } //EventPrinter.print(events); } }); //检索输入处理程序以将事件推入Siddhi InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockEventStream"); //启动事件处理 siddhiAppRuntime.start(); //发送事件到Siddhi inputHandler.send(new Object[]{"IBM", 100f, 100L}); Thread.sleep(1000); inputHandler.send(new Object[]{"IBM", 200f, 300L}); inputHandler.send(new Object[]{"WSO2", 60f, 200L}); Thread.sleep(1000); inputHandler.send(new Object[]{"WSO2", 70f, 400L}); inputHandler.send(new Object[]{"GOOG", 50f, 30L}); Thread.sleep(1000); inputHandler.send(new Object[]{"IBM", 200f, 400L}); Thread.sleep(2000); inputHandler.send(new Object[]{"WSO2", 70f, 50L}); Thread.sleep(2000); inputHandler.send(new Object[]{"WSO2", 80f, 400L}); inputHandler.send(new Object[]{"GOOG", 60f, 30L}); Thread.sleep(1000); inputHandler.send(new Object[]{"WSO2", 80f, 400L}); inputHandler.send(new Object[]{"GOOG", 60f, 30L}); Thread.sleep(1000); //Shutdown the runtime siddhiAppRuntime.shutdown(); //Shutdown Siddhi Manager siddhiManager.shutdown(); } }

 

 

(3)Drools Fusion CEP

1.何为Drools Fusion CEP?

Drools 是用 Java 语言编写的开放源码规则引擎,使用 Rete 算法对所编写的规则求值。Drools 允许使用声明方式表达业务逻辑。可以使用非 XML 的本地语言编写规则,从而便于学习和理解。并且,还可以将 Java 代码直接嵌入到规则文件中,这令 Drools 的学习更加吸引人

Drools Fusion 是Drools 负责启用事件处理行为的一个模块

 

2.Drools优点是什么?

  • 非常活跃的社区支持
  • 易用
  • 快速的执行速度
  • 在 Java 开发人员中流行
  • 与 Java Rule Engine API(JSR 94)兼容

 

 

3.简单示例

package com.us.fusion7 import test.ghf.drools.Person /** 1.@role 事件声明:要把插入drools的数据【Person】声明为事件,声明事件使用@role标签 declare Person @role(event) end 2.@timestamp 每一个事件都要有一个关联的时间戳指派给它。默认时,一个给定事件的时间戳是在事件被插入到工作内存时, 从 Session Clock 读取,并且分配给该事件。有些时候,事件用时间戳作为它自己的一个属性。 在这情况下,用户可以用@timestamp 标记用户属性为时间戳 例如:用Person的 createTime 属性为时间戳 declare Person @role(event) @timestamp( createTime ) end 3.@expires 这个标签只有引擎运行在流(STREAM)模式之下才会被考虑 该标签显示定义 一个事件在什么时候应该到期,事件到期,事件可能不再匹配和激活任何规则时 在person 例子中假设过期时间为20S declare Person @role(event) @timestamp( createTime ) @expires(20s) end 4.滑动时间窗口 滑动时间窗口允许用户编写规则,其将仅匹配在最近的 X 时间单元内发生的事件 例如:只匹配最近3秒内,年龄小于25的人 rule "boy" when $p : Person(age < 25) over window:time(3s) then $p.setDesc("少年"); retract($p); end 5.滑动长度窗口 和滑动时间窗口很类似,其将仅匹配最近几次发生的事件,用法如图,只匹配最近2次发生的事件 rule "old" when $p : Person(age > 49) over window:length(2) then $p.setDesc("老年"); retract($p); end */ function void printName(String streamName,String name,int age,String desc) { System.out.println("streamName:"+streamName+" name:"+name+" age:"+age+" desc:"+ desc); } declare Person @role(event) @timestamp( createTime ) @expires(20s) end rule "old" when $p : Person(age > 49) over window:length(2) then $p.setDesc("老年"); retract($p); printName("boy",$p.getName(),$p.getAge(),$p.getDesc()); end

 

package test.ghf.drools; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; import org.drools.builder.KnowledgeBuilder; import org.drools.builder.KnowledgeBuilderFactory; import org.drools.builder.ResourceType; import org.drools.conf.EventProcessingOption; import org.drools.io.ResourceFactory; import org.drools.runtime.KnowledgeSessionConfiguration; import org.drools.runtime.StatefulKnowledgeSession; import java.net.URL; import java.util.Date; /** * @author ghf * @since 2019/11/7 */ public class App { public static void main(String[] args) { //1.使用默认配置,创建一个KnowledgeBuilder KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); //2.注册 规则文件 URL urlperson = App.class.getClassLoader().getResource("person.drl"); kbuilder.add(ResourceFactory.newFileResource(urlperson.getPath()), ResourceType.DRL); //3.EVENT需要设置config = STREAM KnowledgeBaseConfiguration config = KnowledgeBaseFactory.newKnowledgeBaseConfiguration(); config.setOption(EventProcessingOption.STREAM); //4.开启一个session KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(config); kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); KnowledgeSessionConfiguration configSession = KnowledgeBaseFactory.newKnowledgeSessionConfiguration(); StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession(configSession, null); Person p1 = new Person("小一", 2, new Date()); Person p2 = new Person("小二", 7, new Date()); Person p3 = new Person("小三", 99, new Date()); //5.向session发送消息 ksession.insert(p1); ksession.insert(p2); ksession.insert(p3); //6.执行 int count = ksession.fireAllRules(); //7.查看结果 System.out.println("总执行了" + count + "条规则------------------------------"); } }

 

 

 

 

 

 

 类似资料: