当前位置: 首页 > 工具软件 > Siddhi > 使用案例 >

(二)Siddhi关键分析手段

桂玉石
2023-12-01

一、模式分析(Pattern)

这是一种状态机实现,可让您检测随着时间推移到达的事件中的模式。 这可以关联单个流内或多个流之间的事件。

目的

模式可以确定一段时间内事件的趋势。

语法

以下是模式查询的语法:

from (every)? <event reference>=<input stream>[<filter condition>] -> 
    (every)? <event reference>=<input stream [<filter condition>] -> 
    ... 
    (within <time gap>)?     
select <event reference>.<attribute name>, <event reference>.<attribute name>, ...
insert into <output stream>

-> 用于指示应该在另一个事件之后的事件。后续事件不一定必须在先前事件之后立即发生。前一个事件要满足的条件应该在->之前的[<filter condition>]添加,而后一个事件要满足的条件应该在->之后的[<filter condition>]添加。

<event reference> 这使您可以添加对match事件的引用(别名),以便以后可以对其进行进一步处理。

(within <time gap>)?within子句是可选的。它定义了所有匹配事件应发生的持续时间。

every是可选关键字。这定义了是否应针对具有匹配条件的每个事件到达指定流时触发事件匹配。
如果不使用此关键字,则仅执行一次匹配。

Siddhi还支持按逻辑顺序匹配事件(如(andor,andnot)以及计数匹配事件的模式匹配,将在后续讲解。

Example

如果房间的温度在10分钟内升高5度,则此查询将发送警报。

from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;

在此例中,匹配过程针对TempStream流中的每个事件开始(因为every与一起使用e1=TempStream),如果另一个事件在10分钟内到达,且该temp属性的值大于或等于e1.temp + 5 事件e1的值,则产生事件输出至AlertStream

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BMFkMlXl-1608538963977)(.\img\pattern.png)]

另外:如果本次e1与e2事件已经构成输出事件,那么下一次e2事件到来,本次e1不会参与模式计算;

1、计数模式(Counting Pattern)

计数模式允许您匹配为同一匹配条件接收的多个事件。每个条件匹配的事件数可以通过条件后缀进行限制。

语法

每个匹配条件可以包含一个事件集合,其中包含要匹配的最小和最大事件数,如下面的语法所示。

from (every)? <event reference>=<input stream>[<filter condition>] (<<min count>:<max count>>)? ->  
    ... 
    (within <time gap>)?     
select <event reference>([event index])?.<attribute name>, ...
insert into <output stream>
后缀描述例如
<n1:n2>这与“n1”至“n2”事件匹配(包括“n1”和不超过“n2”)。1:4 匹配第1-4个事件
<n:>这与“n”或更多事件(包括“n”)匹配。<2:> 匹配第2至后续事件
<:n>最多匹配“n”个事件(不包括“n”)。<:5> 匹配前5个事件
<n>这与“n”个事件完全匹配。<5> 匹配第五个事件

可以通过使用事件索引(或引用)来检索集合中特定出现的事件。[]可以用来指示事件索引,1该索引可以用作第一个事件的索引,last可以用作最后一个事件的索引。如果提供的索引大于最后一个事件的索引,则系统将返回null。以下是一些有效的示例。

  • e1[3]指的是3 次事件。
  • e1[last] 指最后一个事件。
  • e1[last - 1] 指最后一个事件之前的事件。

Example

假设TempStream不断上报房间的温度,RegulatorStream上报房间温度调节器动作事件,

以下Siddhi应用程序计算两个温度调节器事件发生之间的一段时间内的温差

define stream TempStream (deviceID long, roomNo int, temp double);
define stream RegulatorStream (deviceID long, roomNo int, tempSet double, isOn bool);

from every( e1=RegulatorStream) -> e2=TempStream[e1.roomNo==roomNo]<1:> -> e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
insert into TempDiffStream;

2、逻辑模式(Logical Patterns)

逻辑模式匹配按时间顺序到达的事件,并将它们与逻辑关系(如“and”、“or”、“not”)相关联。

语法

from (every)? (not)? <event reference>=<input stream>[<filter condition>] 
          ((and|or) <event reference>=<input stream>[<filter condition>])? (within <time gap>)? ->  
    ... 
select <event reference>([event index])?.<attribute name>, ...
insert into <output stream>

关键字如“and”、“or”或“not”可用于说明逻辑关系。

关键字描述
andThis allows both conditions of and to be matched by two events in any order.
orThe state succeeds if either condition of or is satisfied. Here the event reference of the other condition is null.
not <condition1> and <condition2>When not is included with and, it identifies the events that match arriving before any event that match .
not <condition> for <time period>When not is included with for, it allows you to identify a situation where no event that matches <condition1> arrives during the specified <time period>. e.g.,from not TemperatureStream[temp > 60] for 5 sec.

not模式后可以加上一个and子句,not也可以在给定有效期<time period>

此外,在Siddhi中,此时不能使用and、or或not子句将两个以上的流与逻辑条件匹配。

检测非发生事件:

Siddhi允许您通过上面指定的关键字的多种组合来检测未发生的事件,如下表所示。

在列出的模式中,P *可以是常规事件模式,也可以是不存在的事件模式或逻辑模式。

PatternDetected Scenario
not A for <time period>系统启动后,在<时间段>内未发生事件A。
例如,如果出租车在30分钟内未到达目的地,则会生成警报,以表明乘客可能处于危险之中。
not A for <time period> and B事件A不会在<时间段>内发生,但是事件B在某个时间点发生。
例如,如果出租车在30分钟内未到达目的地,并且乘客标记他/她在某个时间点处于危险中,则产生告警。
not A for <time period 1> and not B for <time period 2>事件A不会在“<时间段1>”内发生,事件B也不会在“ <时间段2>”内发生。
例如,如果出租车司机在30分钟内未到达目的地,并且乘客未在同一时间段内将自己标记为处于危险之中,则生成警报。
not A for <time period> or B事件A不在“ <时间段>”内发生,或者事件B在某个时间点发生。
例如,如果出租车在30分钟内未到达目的地,或者乘客已标记出他/她在某个时间点处于危险中,则发出警报。
not A for <time period 1> or not B for <time period 2>事件A不在“ <时间段1>”内发生,或者事件B不在“ <时间段2>”内。
例如,如果出租车在20分钟内未到达目的地A或30分钟内未到达目的地B,则生成警报以指示驾驶员不在预期的路线上。
A → not B for <time period>事件B在事件A发生后的“ <时间段>”之内没有发生。
例如,如果出租车到达目的地,但是没有付款记录,则生成警报。
P* → not A for <time period> and B在发生P *之后,事件A在“ <时间段>”之内没有发生,事件B在某个时间点发生。
P* → not A for <time period 1> and not B for <time period 2>在发生P *之后,事件A不在“ <时间段1>”内发生,且事件B没有在“ <时间段2>”内发生。
P* → not A for <time period> or B在P *发生之后,事件A不在“ <时间段>”内发生,或者事件B在某个时间点发生。
P* → not A for <time period 1> or not B for <time period 2>在发生P *之后,事件A不在“ <时间段1>”内发生,或者事件B没有在“ <时间段2>”内发生。
not A for <time period> → B在系统启动后,事件A的“ <时间段>”内未发生,但是事件B在该“时间段”过去之后发生。
not A for <time period> and B → P*事件A不会在<时间段>内发生,事件B会在某个时间点发生。 然后P *在“ <时间段>”过去之后和B发生之后出现。
not A for <time period 1> and not B for <time period 2> → P*系统启动后,事件A不会在<时间段1>内发生,事件B不会在<时间段2>内发生。 但是,P *在A和B之后发生。
not A for <time period> or B → P*系统启动后,事件A不会在“ <时间段>”内发生,或者事件B会在某个时间点发生。 P *在“ <时间段>”过去之后或在B发生之后发生。
not A for <time period 1> or not B for <time period 2> → P*系统启动后,事件A不会在“ <时间段1>”内发生,或者事件B不会在“ <时间段2>”内发生。 然后P *在“ <时间段1>”和“ <时间段2>”都经过之后出现。
not A and B在事件B之前不会发生事件A。
A and not B在事件A之前不会发生事件B。

Example

(1)在Siddhi应用程序之后,当钥匙从酒店房间取出时,将“停止”控制动作发送给监管机构。

(C -> A or B, 事件发生后,事件A或事件B发生)

define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream RoomKeyStream(deviceID long, roomNo int, action string);

from every( e1=RegulatorStateChangeStream[ action == 'on' ] ) -> 
      e2=RoomKeyStream[ e1.roomNo == roomNo and action == 'removed' ] or e3=RegulatorStateChangeStream[ e1.roomNo == roomNo and action == 'off']
select e1.roomNo, ifThenElse( e2 is null, 'none', 'stop' ) as action
having action != 'none'
insert into RegulatorActionStream;

(2) 如果我们在温度达到12度之前关闭了房间温度调节器,这个Siddhi应用程序将生成一个警报。

(C -> not A and B ,事件C发生后,事件A未发生并且事件B发生)

define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream TempStream (deviceID long, roomNo int, temp double);

from e1=RegulatorStateChangeStream[action == 'start'] -> not TempStream[e1.roomNo == roomNo and temp < 12] and e2=RegulatorStateChangeStream[action == 'off']
select e1.roomNo as roomNo
insert into AlertStream;

(3) 如果在打开温度调节器的5分钟内温度没有降低到12度,此Siddhi应用程序将生成警报。

(C -> not A for

define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream TempStream (deviceID long, roomNo int, temp double);

from e1=RegulatorStateChangeStream[action == 'start'] -> not TempStream[e1.roomNo == roomNo and temp < 12] for 5 minute
select e1.roomNo as roomNo
insert into AlertStream;

(4) 如果出租车在30分钟内未到达目的地,或者乘客已标记出他/她在某个时间点处于危险中,则发出警报。

( C -> not A for

define stream taxiStream(carId string, driverName string, passenger string, destination string, isArrive bool);

define stream  passengerStream(passenger string, destination string,isDangerous bool );

@sink(type='log') 
define stream alarmStream(carId string, driverName string, passenger string, destination string, detail string);

from e1=taxiStream -> not taxiStream[e1.carId==carId and isArrive==true] for 30 minute or e3=passengerStream[e1.passenger==passenger and isDangerous ==true]
select e1.carId as carId, e1.driverName as driverName, e1.passenger as passenger, e1.destination as destination, "出租车在30分钟内未到达目的地,或者乘客标记他/她在某个时间点处于危险中,产生告警" as detail
insert into alarmStream;

(5) 如果出租车在30分钟内未到达目的地, 并且乘客在30分钟内未设置为危险状态,产生告警。

(C -> not A for


define stream taxiStream(carId string, driverName string, passenger string, destination string, isArrive bool);
define stream  passengerStream(passenger string, destination string,isDangerous bool );
@sink(type='log') 
define stream alarmStream(carId string, driverName string, passenger string, destination string, detail string);

from every(e1=taxiStream[isArrive==false]) -> not taxiStream[e1.carId==carId and isArrive==true] for 1 minute and not passengerStream[e1.passenger==passenger and isDangerous==true] for 2 minute
select e1.carId as carId, e1.driverName as driverName, e1.passenger as passenger, e1.destination as destination, "出租车在30分钟内未到达目的地, 并且乘客在30分钟内未设置为危险状态,产生告警" as detail
insert into alarmStream;

二、序列分析(Sequence)

Sequence是一种状态机实现,使您可以检测事件随时间变化的顺序。在这里,所有匹配事件都需要连续到达以匹配序列条件,并且在匹配的事件序列中不能有任何不匹配的事件到达。这可以关联单个流内或多个流之间的事件。

pattern 的多个 event 之间可以是不连续的,但 sequence 的 events 之间必须是连续的

目的

这使您可以在指定的时间段内检测到指定的事件序列。

语法

The syntax for a sequence query is as follows:

from (every)? <event reference>=<input stream>[<filter condition>], 
    <event reference>=<input stream [<filter condition>], 
    ... 
    (within <time gap>)?     
select <event reference>.<attribute name>, <event reference>.<attribute name>, ...
insert into <output stream>
ItemsDescription
,这表示立即发生的下一个事件,即,当与第一个条件匹配的事件到达时,紧随其后的事件应与第二个条件匹配。
<event reference>用户可以添加对match事件的引用,以便以后可以对其进行进一步处理。
(within <time gap>)?within子句是可选的。它定义了所有匹配事件应发生的持续时间。
everyevery是可选关键字。这定义了是否应针对具有匹配条件的到达指定流的每个事件触发匹配事件。
如果不使用此关键字,则仅执行一次匹配。

Example

如果两个连续的温度事件之间的温度升高超过一度,则此查询将生成警报。

from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;

计数序列

计数序列使您可以为同一匹配条件匹配多个事件。每种条件匹配事件的数量可以通过条件后缀如被限制计数模式,或通过使用 *+?

还可以使用事件索引来检索匹配的事件,类似于在Counting Patterns中进行的操作

语法

序列中的每个匹配条件都可以包含事件的集合,如下所示。

from (every)? <event reference>=<input stream>[<filter condition>](+|*|?)?, 
    <event reference>=<input stream [<filter condition>](+|*|?)?, 
    ... 
    (within <time gap>)?     
select <event reference>.<attribute name>, <event reference>.<attribute name>, ...
insert into <output stream>
后缀符号必需/可选描述
+Optional这会将一个或多个事件与给定条件进行匹配。
*Optional这会将零个或多个事件匹配到给定条件。
?Optional这会将零个或一个事件匹配到给定条件。

Example

这个Siddhi应用程序识别温度峰值。

define stream TempStream(deviceID long, roomNo int, temp double);

from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
select e1.temp as initialTemp, e2[last].temp as peakTemp
insert into PeekTempStream;

逻辑序列

逻辑序列在连续到达的事件上使用“and”、“or”和“not”来标识逻辑关系

Syntax

from (every)? (not)? <event reference>=<input stream>[<filter condition>] 
          ((and|or) <event reference>=<input stream>[<filter condition>])? (within <time gap>)?, 
    ... 
select <event reference>([event index])?.<attribute name>, ...
insert into <output stream>

关键字如“and”、“or”或“not”可用于说明逻辑关系,类似于在逻辑模式中的做法。

Example

这个Siddhi应用程序在调节器事件之后紧接着是温度和湿度事件时通知状态。

define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);

from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream;

三、增量聚合(Incremental Aggregation)

增量聚合允许您在指定的时间段内以增量方式获取聚合。

这不仅使您可以计算具有不同时间粒度的聚合,而且还允许您以交互方式访问它们以进行报告,仪表板以及进一步处理。 它的模式是通过聚合定义定义的。

1、语法定义:

增量聚合粒度数据持有者默认每15分钟自动清除一次。 执行数据清除时,将考虑为增量聚合查询中的每个粒度指定的保留期。 为粒度定义的保留期必须大于或等于下表中指定的最小保留期。 如果没有为粒度定义有效的保留期限,则将应用默认的保留期限(如下表所示)。

GranularityDefault retentionMinimum retention
second120 seconds120 seconds
minute24 hours120 minutes
hour30 days25 hours
day1 year32 days
monthAll13 month
yearAllnone

Purpose

增量聚合允许您检索不同持续时间的聚合值。 也就是说,它允许您获取诸如秒,分钟,小时等持续时间的流属性的总计,例如总和,计数,平均,最小,最大,计数和distinctCount。

这在许多Google Analytics(分析)方案中非常重要,因为通常需要几个时间段的汇总值。 此外,这可以确保聚合不会由于意外的系统故障而丢失,因为聚合可以存储在不同的持久性存储中。

Syntax

@store(type="<store type>", ...)
@purge(enable="<true or false>",interval=<purging interval>,@retentionPeriod(<granularity> = <retention period>, ...) )
define aggregation <aggregator name>
from <input stream>
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
    group by <attribute name>
    aggregate by <timestamp attribute> every <time periods> ;

The above syntax includes the following:

ItemDescription
@BufferSize从V4.2.0版本开始。 这标识了要保留在缓冲区中以便处理乱序事件处理的过期事件的数量。 这是一个可选参数,仅在聚合基于外部时间戳记时才适用(因为基于事件到达时间聚合的事件不能乱序)。 Siddhi根据最新事件的时间戳和计算聚合的最精细的持续时间来确定事件是否到期。 例如,如果汇总是针对“秒…年”计算的,则最精细的持续时间为秒。 因此,如果缓冲区大小为3并且事件在第51、52、53和54秒到达,则所有较旧的聚合(即51、52和53秒)都保留在缓冲区中,因为最新事件在 第54秒 默认值为“ 0”。
@IgnoreEventsOlderThanBuffer从V4.2.0版本开始。此注释指定是否聚合早于缓冲区的事件。 如果此参数设置为“ false”(默认设置),则早于缓冲区的任何事件都将与缓冲区中最早的事件聚合。 如果此参数设置为“ true”,则任何早于缓冲区的事件都将被丢弃。 这是一个可选的注释。
@store此批注用于引用存储计算出的汇总结果的数据存储。 该注释是可选的。 如果未提供注释,则数据将存储在“in-memory”存储中。
@purge此注释用于配置聚合粒度中的清除。 如果未提供此注释,则将应用上述默认清除。 如果要禁用自动数据清除,则可以按如下方式使用此注释:@purge(enable = false)/如果Siddhi应用程序中包含聚合查询,则应禁用数据清除,以实现只读目的。
@retentionPeriod此注释用于指定执行数据清除时需要保留数据的时间长度。 如果未提供此注释,则将应用默认保留期。
<aggregator name>这为聚合指定了唯一的名称,以便在访问汇总结果时可以访问它。
<input stream>提供聚合的流。 注意! 此流应该已经定义。
group by <attribute name>group by子句是可选的。 如果它包含在Siddhi应用程序中,则将按每个“ group by”属性来计算合计值。 如果未使用,则所有事件都将汇总在一起。
by <timestamp attribute>此子句是可选的。 这定义了应该用作时间戳的属性。 如果不使用此子句,则默认使用事件时间。 时间戳记可以是“字符串”或“长”值。 如果值为long,则表示以毫秒为单位的unix时间戳(例如1496289950000)。 如果它是一个字符串值,则支持的格式为--
::(如果时间在格林尼治标准时间)和 <yyyy>- <MM>-<dd> <HH>:<mm>:<ss> <Z>(如果时间不在格林尼治标准时间中),则必须为提供ISO 8601 UTC偏移量。 (例如,“ + 05:30”,“-11:00”)。
<time periods>可以将时间段指定为最小值和最大值由三个点分隔的范围,也可以指定为逗号分隔的值。 例如,可以将范围指定为秒…年,其中每秒,分钟,小时,天,月和年进行聚合。 逗号分隔的值可以指定为分钟,小时。 从v4.1.1开始,仅支持指定逗号分隔值时的跳过时间长度(例如,分钟,跳过小时长度的日期)

Note

从V4.2.0起,将在日历开始时间针对每个粒度与GMT时区进行汇总

Note

从V4.2.6开始,可以在多个Siddhi应用程序中定义相同的聚合以进行连接,但是,仅一个siddhi应用程序应执行处理(即聚合输入流应仅将事件馈送到一个聚合定义)。

注意乱序事件对聚合的影响!

Example1:

该Siddhi应用程序定义了一个名为“ TradeAggregation”的聚合,以计算到达“ TradeStream”流的事件的“ price”属性的平均值和总和。 这些汇总是在当前时间起至下一年的范围内的每个时间粒度计算的(seconds、minutes、hour、day、mounth、year)。

define stream TradeStream (symbol string, price double, volume long, timestamp long);

@purge(enable='true', interval='10 sec',@retentionPeriod(sec='120 sec',min='24 hours',hours='30 days',days='1 year',months='all',years='all'))
define aggregation TradeAggregation
  from TradeStream
  select symbol, avg(price) as avgPrice, sum(price) as total
    group by symbol
    aggregate by timestamp every sec ... year;

Example2:

该Siddhi应用程序定义了一个名为“ TradeAggregation”的聚合,每分钟聚合,计算到达“ TradeStream”流的事件的“ price”属性的平均值和总和,聚合禁止数据清除,聚合数据持久化到数据库中。

define stream TradeStream (symbol string, price double, volume long, timestamp long);

@store(type='rdbms' , jdbc.url='jdbc:mysql://192.168.1.165/siddhi?useSSL=false',username='root',password='199606',jdbc.driver.name='com.mysql.jdbc.Driver') 
@purge(enable='false')
define aggregation TradeAggregation
  from TradeStream
  select symbol, avg(price) as avgPrice, sum(price) as total
    group by symbol
    aggregate by timestamp every minute;

2、Join (Aggregation)

这允许流从聚合中检索计算的聚合值。

Syntax

带有聚合的联接与带有表的联接相似,但是带有更多的in和per子句。

from <input stream> join <aggrigation> 
  on <join condition> 
  within <time range> 
  per <time granularity>
select <attribute name>, <attribute name>, ...
insert into <output stream>;

除了表联接的构造之外,这还包括以下内容。 请注意,“ on”条件是可选的:

ItemDescription
within <time range>这使您可以指定需要检索聚合值的时间间隔。 可以通过提供以逗号分隔的开始和结束时间作为“字符串”或“长”值,或使用通配符“字符串”指定数据范围来指定。 有关详细信息,请参见示例。
per <time granularity>这指定必须对汇总值进行分组和返回的时间粒度。 例如,如果您指定“days”,则在选定的时间间隔内将每天的检索汇总值分组。
AGG_TIMESTAMP这指定了聚合的开始时间,可以在select子句中使用。

within子句和per子句还接受流中的属性值。

Note

聚合的时间戳可以通过AGG_TIMESTAMP属性来访问。

Example1

在秒、分、时、天、月、年时间跨度上分别对流TradeStream实施聚合操作,用户检索1592668800000~ 1592841600000(2020-06-21 00:00:00 ~ 2020-06-23 00:00:00)时间跨度内,时间粒度上为小时为“hour”的聚合,且T.symbol为“test”。

define trigger hourTrigger at "0 0 * * * ?";

define stream TradeStream (symbol string, price double, volume long, timestamp long);

define stream AggregateStockStream( symbol string, total double, avgPrice double);

define aggregation TradeAggregation
  from TradeStream
  select symbol, avg(price) as avgPrice, sum(price) as total
    group by symbol
    aggregate every sec ... year;

from hourTrigger as S join TradeAggregation as T 
  on T.symbol == "test"
  within 1592668800000l, 1592841600000l
  per "hours" 
select T.symbol, T.total as total, T.avgPrice 
insert into AggregateStockStream;

-- select中可使用函数,处sum()时计算within范围内的total的总和
from hourTrigger as S join TradeAggregation as T 
  on T.symbol == "test"
  within "2020-06-21 00:00:00", "2020-06-23 00:00:00"
  per "hours" 
select T.symbol, sum(T.total) as total, T.avgPrice 
insert into AggregateStockStream;

Example2

允许用户通过包含startTime、endTime和perDuration等属性的输入流,动态传入检索请求的,开始时间、结束时间以及聚合的时间粒度等参数。其中,AGG_TIMESTAMP为聚合开始的时间;

(允许用户通过包含startTimeendTimeperDuration属性的值来输入要在集合检索请求中检索集合的持续时间的开始时间、结束时间以及聚合的时间粒度。其中,AGG_TIMESTAMP为聚合开始的时间)

define stream TradeStream (symbol string, price double, volume long, timestamp long);

define stream AggregateStockStream(stamp long, symbol string, total double, avgPrice double);

define stream StockStream (symbol string, startTime long, endTime long, perDuration string);

define aggregation TradeAggregation
  from TradeStream
  select symbol, avg(price) as avgPrice, sum(price) as total
    group by symbol
    aggregate every sec ... year;

from StockStream as S join TradeAggregation as T
  on S.symbol == T.symbol 
  within S.startTime, S.endTime 
  per S.perDuration
select AGG_TIMESTAMP as stamp, S.symbol, T.total, T.avgPrice 
insert into AggregateStockStream;

Supported join types

聚合联接支持以下联接操作。

  • Inner join (join)

    这是联接操作的默认行为。 “ join”用作将流与聚合连接的关键字。 仅在流和聚合中存在匹配事件时才生成输出。

  • Left outer join

    “ left external join”操作允许您根据条件将左侧的流与右侧的聚合连接起来。 这里,即使右聚合中没有匹配事件,它也会通过为右聚合的属性使用空值来返回左流的所有事件。

  • Right outer join

    这类似于“Left outer join”。 使用“ right external join”作为关键字,根据条件将右侧的流与左侧的聚合连接起来。 即使左聚合中没有匹配的事件,它也会返回右流的所有事件。

四、机器学习(Machine Learning):

1、流机器学习:

提供对事件流的流的机器学习(聚类,分类和回归)

  • bayesianRegression

    该扩展使用贝叶斯线性回归模型进行预测。贝叶斯线性回归允许通过估计全预测分布来确定每个预测的不确定性

  • kMeansIncremental

    在流数据集上执行K-Means聚类。数据点可以是任何维度,并且维度是根据参数数量计算得出的。查询要处理的所有数据点都应具有相同的维数。欧几里得距离被用作距离度量。该算法类似于https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm上的顺序K均值聚类

  • kMeansMiniBatch

    在流数据集上执行K-Means聚类。数据点可以是任何维度,并且维度是根据参数数量计算得出的。单个查询中要处理的所有数据点都应具有相同的维数。欧几里得距离被用作距离度量。该算法类似于小批量K均值。(请参阅D.Sculley,Google,Inc。的Web-Scale K-Means聚类)。

  • perceptronClassifier

    此扩展使用线性二进制分类感知器模型进行预测。

  • updateBayesianRegression

    此扩展构建/更新线性贝叶斯回归模型。此扩展使用了随机变分推断的改进版本。

  • updatePerceptronClassifier

    此扩展构建/更新线性二进制分类Perceptron模型。

    1.1 kMeansIncremental

    在流数据集上执行K-Means聚类。 数据点可以是任何维度,并且维度是根据参数数量计算得出的。 查询要处理的所有数据点都应具有相同的维数。 欧几里得距离被用作距离度量。 该算法类似于此的顺序K均值聚类 - https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm

    Syntax

    streamingml:kMeansIncremental(<INT> no.of.clusters, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansIncremental(<INT> no.of.clusters, <DOUBLE> decay.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    

    QUERY PARAMETERS

    名称描述默认值可能的数据类型可选的动态
    no.of.clusters数据集中假定的自然簇数INTNoNo
    decay.rate旧数据与新数据相比的衰减率。 此值的取值范围为[0,1], 0表示仅使用旧数据,而1表示仅使用新数据0.01DOUBLEYesNo
    model.feature这是一个可变长度的参数。 根据数据点的维数,我们将沿每个轴接收坐标作为特征。DOUBLE FLOAT INT LONGNoYes

    生成事件额外返回的属性:

    名称描述可能的数据类型
    euclideanDistanceToClosestCentroid表示当前数据点和最接近的质心之间的欧几里得距离。DOUBLE
    closestCentroidCoordinate这是一个可变长度属性。 根据维数(D),我们将返回此模型对于当前事件最近的质心的d维坐标(质心坐标1,质心坐标2,…质心坐标D)。这是预测结果,代表当前事件所属的群集。DOUBLE

    Examples

    define stream InputStream (x double, y double);
    @info(name = 'query1')
    from InputStream#streamingml:kMeansIncremental(2, 0.2, x, y)
    select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
    insert into OutputStream;
    

    这是用户提供衰减率的示例。 由于将所需的聚类数指定为2,因此将使用前两个事件来启动模型。在第一个事件本身之后,将开始进行预测。

    在此例中返回事件最近的。

    EXAMPLE 2

    define stream InputStream (x double, y double);
    @info(name = 'query1')
    from InputStream#streamingml:kMeansIncremental(2, x, y)
    select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
    insert into OutputStream;
    

    在这个示例中用户未给出衰减率,因此将使用默认值

    1.2 kMeansMiniBatch

    在流数据集上执行K-Means聚类。 数据点可以是任何维度,并且维度是根据参数数量计算得出的。 单个查询中要处理的所有数据点都应具有相同的维数。 欧几里得距离被用作距离度量。 该算法类似于小批量K均值。 (请参阅D.Sculley,Google,Inc.的Web-Scale K-Means聚类)。

    Syntax

    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> maximum.iterations, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> maximum.iterations, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> maximum.iterations, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> maximum.iterations, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    

    QUERY PARAMETERS

    名称描述默认值可能的参数类型可选的d动态
    no.of.clusters数据集中假定的自然簇数。INTNoNo
    decay.rate旧数据与新数据相比的衰减率。 此值的取值范围为[0,1], 0表示仅使用旧数据,而1表示仅使用新数据0.01DOUBLEYesNo
    maximum.iterations迭代次数,过程进行迭代,直到达到最大迭代次数或质心不变为止50INTYesNo
    no.of.events.to.retrain计算群集中心的事件数。20INTYesNo
    model.feature这是一个可变长度的参数。 根据数据点的维数,我们将沿每个轴接收坐标作为特征。DOUBLE FLOAT INT LONGNoYes

    额外返回的属性

    名称描述可能的类型
    euclideanDistanceToClosestCentroid表示当前数据点和最接近的质心之间的欧几里得距离。DOUBLE
    closestCentroidCoordinate这是一个可变长度属性。 从模型到当前事件的最接近质心的d维坐标。 这是预测结果,代表当前事件所属的群集。DOUBLE

    Examples

    define stream InputStream (x double, y double);
    @info(name = 'query1')
    from InputStream#streamingml:kMeansMiniBatch(2, 0.2, 10, 20, x, y)
    select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
    insert into OutputStream;
    

    这是用户提供所有三个参数(数据衰减率、迭代次数、用于计算质心的事件数)的示例。 前20个事件将用于构建模型,而从第21个事件开始预测。

    EXAMPLE 2

    define stream InputStream (x double, y double);
    @info(name = 'query1')
    from InputStream#streamingml:kMeansMiniBatch(2, x, y)
    select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
    insert into OutputStream;
    

    用户未指定参数,因此将使用默认值。

    1.3 感知器分类器(perceptronClassifier)

    此扩展使用线性二进制分类感知器模型进行预测。

    Syntax

    streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.bias, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.threshold, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.bias, <DOUBLE> model.threshold, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    

    查询参数:

    名称描述默认值可能的数据类型可选的动态
    model.name使用的模型的名称。STRINGNoNo
    model.biasPerceptron算法的偏差0.0DOUBLEYesNo
    model.threshold区分两个类别的阈值。 指定的值必须介于零和一之间。0.5DOUBLEYesNo
    model.feature模型的特征值(流中的属性值)DOUBLE FLOAT INT LONGNoYes

    返回的属性

    名称描述可能的类型
    prediction预测值 (true/false)BOOL
    confidenceLevel预测的可能性DOUBLE

    Examples

    define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);
    
    from StreamA#streamingml:perceptronClassifier('model1',0.0,0.5, attribute_0, attribute_1, attribute_2, attribute_3) 
    insert all events into OutputStream;
    

    该查询使用一个名为model1的Perceptron模型,该模型具有0.0的偏差和0.5的阈值学习率,以预测由attribute_0,attribute_1,attribute_2和attribute_3表示的特征向量的标签。 预测的标签(“真/假”)与预测置信度(概率)和特征向量一起被发送到“ OutputStream”流。 作为输出结果,OutputStream流定义如下:((attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction bool, confidenceLevel double)。

    1.4 更新感知器分类器(updatePerceptronClassifier )

    这个扩展建立/更新了一个线性二元分类感知器模型。

    语法

    streamingml:updatePerceptronClassifier(<STRING> model.name, <BOOL|STRING> model.label, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    streamingml:updatePerceptronClassifier(<STRING> model.name, <BOOL|STRING> model.label, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
    

    QUERY PARAMETERS

    名称描述默认值可能的数据类型可选动态
    model.name要生成/更新的模型的名称。STRINGNoNo
    model.label数据集的标签或类的属性。BOOL STRINGNoYes
    learning.rate感知器算法的学习率。0.1DOUBLEYesNo
    model.feature感知器的学习率算法。特征需要作为流属性的模型的。DOUBLE FLOAT INT LONGNoYes

    Extra Return Attributes

    名称描述可能的数据类型
    featureWeight<特征.名称>模型权重DOUBLE

    Examples

    EXAMPLE 1

    define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 string );
    
    from StreamA#streamingml:updatePerceptronClassifier('model1', attribute_4, 0.01, attribute_0, attribute_1, attribute_2, attribute_3) 
    insert all events into outputStream;
    

    此查询建立/更新名为“model1”的感知机模型,学习率为“0.01”,使用“attribute_0”、“attribute_1”、“attribute_2”和“attribute_3”作为特征,并使用“attribute_4”作为标签。模型的更新权重被发送到OutputStream流。

    EXAMPLE 2

    define stream ProductionTrainStream (density double, temperature double, qualityCheck_pass bool );
    
    define stream ProductionInputStream (density double, temperature double);
    
    @sink(type='log')
    define stream PredictedQCStream (density double, temperature double, prediction bool, confidenceLevel double);
    
    @info(name = 'query-train')
    from ProductionTrainStream#streamingml:updatePerceptronClassifier('QCmodel', qualityCheck_pass, 0.1, density, temperature)
    select *
    insert into trainOutputStream;
    
    @info(name = 'query-predict')
    from ProductionInputStream#streamingml:perceptronClassifier('QCmodel', 0.0, 0.5, density, temperature)
    select *
    insert into PredictedQCStream;
    

    分类器模型训练预测

2、PMML模型:

此扩展根据定义的PMML标准模型处理输入流属性,并将处理后的结果与输入流属性一起输出。

PMML(预测模型标记语言)是一种标准化的序列化,用于导出预测解决方案(机器学习模型)。PMML的工作方式是在一个系统中定义模型,然后通过XML文件将模型传输到另一个系统。这允许使用新系统中的事件进行预测。除了一个或多个预测模型之外,此XML文件还可以包含各种数据转换和预处理步骤。

句法:

pmml:predict(<STRING> path.to.pmml.file, <STRING> input)

QUERY PARAMETERS

名称描述默认值可能的数据类型可选的动态
path.to.pmml.filePMML模型文件的路径。STRINGNoNo
input输入流的属性,该属性作为一个值发送到PMML标准模型,基于该值进行预测。预测函数不接受任何常量值作为输入参数。根据输入流定义,您可以具有多个输入参数。Empty ArraySTRINGYesNo

Extra Return Attributes

NameDescriptionPossible Types
output查询中定义的所有已处理输出。 输出的数量可以根据查询定义而变化。STRING INT DOUBLE FLOAT BOOL

Examples

define stream SweetProductionStream (name string, currentHourAmount  double, previousHourAmount double );

@sink(type='log')
define stream PredictionStream (name string, currentHourAmount double, previousHourAmount double, Predicted_nextHourAmount string);

from SweetProductionStream#pmml:predict('/home/software/wso2sp-4.4.0/pmml/Toto.pmml')
select *
insert into PredictionStream;

3、R语言:

eval (Stream Processor)

R脚本流处理器对每个事件运行Siddhi应用程序中定义的R脚本,并根据提供的输入变量参数和预期的输出属性生成汇总输出。

Syntax

r:eval(<STRING> script, <INT|LONG|FLOAT|DOUBLE|STRING|STRING> output.attributes, <INT|LONG|FLOAT|DOUBLE|STRING|STRING> input.attributes)

QUERY PARAMETERS

名称描述默认值可能的数据类型是否可选是否动态
scriptR脚本为字符串,根据提供的输入变量参数和预期的输出属性生成聚合的输出STRINGNoNo
output.attributes预期的输出属性集。 这些可以以逗号分隔的列表形式提供。 每个属性都表示为“ <名称> <空间> <类型>”。 例如,“输出1字符串,输出2长”。INT LONG FLOAT DOUBLE STRING STRINGNoNo
input.attributes生成预期输出时要考虑的一组输入属性。 可以在输出属性之后以逗号分隔的列表形式提供该列表。 例如“ att1,att2”。INT LONG FLOAT DOUBLE STRING STRINGNoNo

Extra Return Attributes

名称描述可能的数据类型
outputParameters为每个事件运行R脚本后,将返回输出参数。INT LONG FLOAT DOUBLE STRING STRING

Examples

@info(name = 'query1')
from weather#window.lengthBatch(2)#r:eval("c <- sum(time); m <- sum(temp); ", "c long, m double", time, temp) 
select * 
insert into dataOut;

该查询运行R脚本’c <-sum(time); m <-sum(temp);’ 对于每两个事件以翻滚的方式。 通过将名为“ time”和“ temp”的两个其他参数的值作为输入,可以得出两个名为“ c”和“ m”的输出参数的值。

evalSource (Stream Processor)

R源流处理器为每个事件运行从文件加载的R脚本,并根据提供的输入变量参数和预期的输出属性生成聚合输出。

Syntax

r:evalSource(<STRING> file.path, <INT|LONG|FLOAT|DOUBLE|STRING|STRING> output.attributes, <INT|LONG|FLOAT|DOUBLE|STRING|STRING> input.attributes)

QUERY PARAMETERS

名称描述默认值可能的数据类型是否可选是否动态
file.path该脚本所在的R脚本的文件路径使用输入变量参数并产生预期的输出属性。STRINGNoNo
output.attributes预期的输出属性。 可以提供它,并提供一个用逗号分隔的属性名称字符串。 每个属性都表示为<名称> <空间> <类型>。 例如,“输出1字符串,输出2长”。INT LONG FLOAT DOUBLE STRING STRINGNoNo
input.attributes生成预期输出时要考虑的一组输入属性。 可以在输出属性之后以逗号分隔的列表形式提供该列表。 例如“ att1,att2”。INT LONG FLOAT DOUBLE STRING STRINGNoNo

Extra Return Attributes

名称描述可能的数据类型
outputParameters为每个事件运行R脚本后,将返回输出参数。INT LONG FLOAT DOUBLE STRING STRING

Examples

@info(name = 'query1')
from weather#window.lengthBatch(2)#r:evalSource("src/test/resources/sample2.R", "m int, c float", time, temp)
select *
insert into dataOut;

此“ r”源函数采用“ r”脚本文件位置,并根据该文件中的定义计算输出。

4、TensorFlow模型:

predict (Stream Processor)

根据已经构建的TensorFlow机器学习模型执行推断(预测), 只要满足以下条件,模型的类型是无限的(包括图像分类器,深度学习模型)。

1.以SavedModel格式保存(SavedModel:TensorFlow模型的通用序列化格式);
2.对模型进行初步训练并准备进行推理;
3.编写推理逻辑并将其保存在模型中;

4.signature_def(定义输出和输入接口协议)已正确包含在metaGraphDef(具有有关图的信息的协议缓冲区文件)中,并且signature def的键名为“ serving-default”;

另外,推理的前提条件如下:

1.用户知道输入和输出节点的名称;
2.具有Java基本类型或其多维数组的预处理数据集;

可以从保存的模型signature def中检索与输入和输出节点有关的信息:

1、可以使用https://www.tensorflow.org/programmers_guide/saved_model上的save_model_cli命令读取signature_def 。

2、在Python中可以按以下方式读取signature_def:

with tf.Session() as sess:
  md = tf.saved_model.loader.load(sess, ['serve'], export_dir)
  sig = md.signature_def[tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY]
  print(sig)

3、在Java中可以读取signature_def:

final String DEFAULT_SERVING_SIGNATURE_DEF_KEY = "serving_default";
final SignatureDef sig =
      MetaGraphDef.parseFrom(model.metaGraphDef())
          .getSignatureDefOrThrow(DEFAULT_SERVING_SIGNATURE_DEF_KEY);

您将必须使用Java导入以下内容。
import org.tensorflow.framework.MetaGraphDef;
import org.tensorflow.framework.SignatureDef;

语法

tensorFlow:predict(<STRING> absolute.path.to.model, <STRING> input.node.names, <STRING> output.node.names, <INT|STRING|DOUBLE|LONG|FLOAT|BOOL|OBJECT> attribute, <INT|STRING|DOUBLE|LONG|FLOAT|BOOL|OBJECT> ...)
tensorFlow:predict(<STRING> absolute.path.to.model, <INT|STRING|DOUBLE|LONG|FLOAT|BOOL|OBJECT> attribute, <INT|STRING|DOUBLE|LONG|FLOAT|BOOL|OBJECT> ...)

参数

名称描述默认值可能的数据类型是否可选是否动态
absolute.path.to.model这是本地计算机中model文件夹的绝对路径。STRINGNoYes
input.node.names输入节点的名称,以逗号分隔的字符串表示。可变长度参数。-STRINGYesYes
output.node.names输出节点的名称,以逗号分隔的字符串表示。可变长度参数。-STRINGYesYes
attribute事件的属性,作为输入节点输入。 请注意,数组应强制转换为对象并发送。这是一个可变长度参数。INT STRING DOUBLE LONG FLOAT BOOL OBJECTNoYes

额外返回的属性:

名称描述可能的数据类型
outputs这是一个可变长度的返回属性。 来自推论的输出张量将被展平并以其原始值发送。 如果用户希望重建输出张量的形状,则希望他/她知道。 可以从TensorFlow保存的模型signature_def中检索形状和数据类型信息。 有关如何读取signature_def的说明,请参见此扩展的说明。INT STRING DOUBLE LONG FLOAT BOOL

Examples

define stream InputStream (x string);

@sink(type='log') 
define stream OutputStream (outputPoint0 double, outputPoint1 double);

@info(name = 'query1')
from InputStream#tensorFlow:predict('D:\Software\wso2sp-4.4.0\wso2sp-4.4.0/samples/artifacts/TensorflowSample/Regression', 'inputPoint', 'outputPoint', x)
select outputPoint0, outputPoint1
insert into OutputStream;

这是一个简单的线性回归模型使用样例,输入一个二维x坐标,返回从保存的模型预测的二维y坐标。

请注意,数组应强制转换为对象并发送。这是一个可变长度参数。 | | INT STRING DOUBLE LONG FLOAT BOOL OBJECT | No | Yes |

额外返回的属性:

名称描述可能的数据类型
outputs这是一个可变长度的返回属性。 来自推论的输出张量将被展平并以其原始值发送。 如果用户希望重建输出张量的形状,则希望他/她知道。 可以从TensorFlow保存的模型signature_def中检索形状和数据类型信息。 有关如何读取signature_def的说明,请参见此扩展的说明。INT STRING DOUBLE LONG FLOAT BOOL

Examples

define stream InputStream (x string);

@sink(type='log') 
define stream OutputStream (outputPoint0 double, outputPoint1 double);

@info(name = 'query1')
from InputStream#tensorFlow:predict('D:\Software\wso2sp-4.4.0\wso2sp-4.4.0/samples/artifacts/TensorflowSample/Regression', 'inputPoint', 'outputPoint', x)
select outputPoint0, outputPoint1
insert into OutputStream;

这是一个简单的线性回归模型使用样例,输入一个二维x坐标,返回从保存的模型预测的二维y坐标。

 类似资料: