当前位置: 首页 > 工具软件 > Slipstream > 使用案例 >

transwarp Slipstream 简介之高级功能

段干瑞
2023-12-01

1. 监控告警

Slipstream整合监控告警工具Alert4J,用于在流应用出问题的时候报错,支持邮件推送,也可以与微信、其 他监控工具整合。
Alert4J当前版本没有专门的配置界面,将在下个版本时支持。当前版本可以通过增加alert4j.properties文 件支持。下面是该文件的一般配置内容,以邮件推送为例:

 alert4j.service=email
  email.server.host=smtp.exmail.qq.com
  email.server.port=465
  email.server.ssl=true
  email.validate=true
  email.sender.username=prod@transwarp.io
  email.sender.password=test
  email.from.address=prod@transwarp.io
  email.to.addresses=test@transwarp.io

2.复杂事件处理(CEP)

Slipstream 支持在流上做复杂事件处理(Complex Event Processing)。复杂事件的处理分为事件和操作两 个部分,事件就是流中的数据,通常会在事件上加上过滤条件。操作即事件的逻辑顺序及其生命周期的控 制,例如两次取款操作发生在一个特定的时间间隔内且满足第二次取款操作发生在第一次取款操作之后。我 们将这种通过操作组合在一起的一系列事件称为复杂事件模式。
使用CEP需要在任务级别打开NGMR_ENGINE_MODE为morphling,使引擎运行在事件驱动模式下。Slipstream 的CEP支持系统时间和事件时间两种方式,如果需要使用事件事件,还需要在任务级别打开事件时间的参数选 项。
CEP的基本语法如下:

SELECT EVENT1.[column], 1 EVNET2.[column],
......,
EVNETn.[column] FROM PATTERN 2
( EVENT1=[stream][condition] [FOLLOWEDBY | ,] 3
EVENT2= [stream][condition] [FOLLOWEDBY | ,] ....
EVENTn= [stream][condition] ) 4 WITHIN (time interval); 5
1 EVENT: 模式中定义的事件名,可以任意定义
2 PATTERN: 指定模式的关键字
3 表示事件之间的关系,FOLLOWEDBY表示只要事件B发生在事件A之后,那么事件B也应该参与计算;如果此 处是“,”表示事件B必须是事件A之后发生的第一个事件。
4 condition:即该事件的发生条件,即SQL中的条件表达式。
5 WITHIN: 指定该次复杂事件处理的时间区间。

案例1(银行)
例 14. 检测盗刷行为
银行需要在10分钟之内检测出当前某笔取款交易是否存在盗刷银行卡的行为。
1. 创建输入流

        CREATE APPLICATION cep_example;
        USE APPLICATION cep_example;
        SET streamsql.use.eventmode=true;
        CREATE STREAM transaction(
            location_id STRING, card_id STRING, behavior STRING
            )
        tblproperties(
            "topic"="transaction_t1",
            "kafka.zookeeper"="localhost:2188",
            "kafka.broker.list"="localhost:9098"
);
        CREATE TABLE exception_ret(
            location_id_1 STRING,
            location_id_2 STRING,
            behavior STRING,
            card_id STRING
);

2 创建规则启动流任务

       INSERT INTO exception_ret
        SELECT e1.location_id, e1.card_id, e1.behavior,
               e2.location_id, e2.card_id, e2.behavior
        FROM PATTERN (
--同一张卡10分钟内在两个不同地点发生了取款行为,意味着有盗刷可能 e1=transaction[e1.behavior='withdraw']
FOLLOWEDBY
e2=transaction[
                e2.card_id = e1.card_id AND
                e2.behavior='withdraw' AND
                e2.location_id != e1.location_id]
        ) WITHIN ('10' minute);

案例2:工业机床
例 15. 工业机床异常警报
需要对工业机床以下两种情况进行监测:
• Case1: 机械臂需要在1分钟内对某节点温度急剧变化的事件做出预警
• Case2: 机械臂需要在1分钟内对某次设备加工操作的异常位置信息做出预警,可能是该机械臂存在 设备故障。
1. 创建输入流

  CREATE APPLICATION cep_example;
            USE APPLICATION cep_example;
            SET streamsql.use.eventmode=true;
            CREATE STREAM robotarm(armid STRING, temperature FLOAT)
            tblproperties(
                "topic"="arm_t1",
                "kafka.zookeeper"="localhost:2188",
                "kafka.broker.list"="localhost:9098"
            );
            CREATE TABLE tem_ret(armid STRING, temperature FLOAT);
            CREATE TABLE coords_ret(armid STRING, coords FLOAT);

2.创建规则启动流任务

    INSERT INTO tem_ret
            SELECT e2.armid, e2.temperature
            FROM PATTERN(
-- 温度在1分钟内变化超过20度,给出预警 e1=robotarm[e1.temperature > 0 && < 80] FOLLOWEDBY e2=robotarm[e2.armid=e1.armid &&
                          e2.temperature - e1.temperature > 20]
            ) WITHIN ('1' minute);
            INSERT INTO coords_ret
            SELECT e2.armid, e2.x, e2.y
            FROM PATTERN(
-- 前后两次位置坐标变化大于阀值时,给出预警 e1=robotarm[x * x + y * y <= 1], e2=robotarm[
                    e2.armid=e1.armid &&
                    e2.x * e2.x + e2.y * e2.y - (e1.x * e1.x + e1.y * e1.y) >=0.3
                ]) WITHIN ('1' minute);

案例3:交通行业

例 16. 对交通车流量和套牌车进行预警监测
需要对车辆交通的以下两种情况进行监测:
• Case1: 每分钟统计一次卡口车流量,若10分钟内某卡口的过车流量超过阀值,需要及时预警并反馈 现场进行交通疏导
• Case2: 10分钟两个跨地市的行政区域出现同一个车牌,有理由怀疑是套牌车,需要及时预警反馈。 1. 创建输入流

      CREATE APPLICATION cep_example;
            USE APPLICATION cep_example;
            SET streamsql.use.eventmode=true;
            CREATE STREAM traffic(veh_id STRING, veh_type STRING, speed FLOAT, location_id STRING)
            tblproperties(
                "topic"="traffic_t1",
                "kafka.zookeeper"="localhost:2188",
                "kafka.broker.list"="localhost:9098"
            );
            CREATE TABLE traffic_flow_ret (location_id STRING, traffic_flow INT);
            CREATE TABLE traffic_susp(loc_id1 STRING, loc_id2 STRING, veh_id STRING);
  1. 创建规则启动流任务
      CREATE STREAM traffic_flow AS
            SELECT location_id, count(*) as veh_flow
            FROM traffic STREAMWINDOW w1 as (length '1' minute slide '1' minute)
            GROUP BY location_id;
            INSERT INTO traffic_flow_ret
            SELECT e2.location_id, e2.veh_flow
            FROM PATTERN(
-- 10分钟内某同一卡口车流量增幅超过60 e1=traffic_flow[e1.veh_flow > 0], e2=traffic_flow[
                    e2.veh_flow - e1.veh_flow > 60 AND
                    e2.location_id = e1.location_id]
            ) WITHIN ('10' minute);
            INSERT INTO traffic_susp
            SELECT e1.location_id, e2.location_id, e2.veh_id
            FROM PATTERN(
-- 车辆类型为A1类,10分钟内不同地区出现同一个车牌号 e1=traffic[e1.veh_type="A1"] FOLLOWEDBY
e2=traffic[e2.veh_id = e1.veh_id AND e2.location_id != e1.location_id]
            ) WITHIN ('10' minute);

CEP的使用注意点:
1. 事件之间的分隔符有逗号(,)以及FOLLOWEDBY,两者最主要的区别在于,逗号表示前后两个事件一定是相 邻的两个事件,而FOLLOWEDBY则没有这种要求。例如,(e1,e2)表示事件e2是事件e1之后的第一个事件; (e1 FOLLOWEDBY e2) 表示e2可以发生在e1之后的任意时刻(WITHIN的时间间隔内)。使用逗号分隔符的 时候,第一个事件在发生后会被丢弃,即PATTERN(e1,e2),当e3到达时,此时PATTERN变为(e2,e3)。 而FOLLOWEDBY不会丢弃事件。
2. 在实际场景中,往往需要针对每个用户的事件流进行分析而不是全局的事件,可以通过distribute by的 方式,按照指定的字段(例如用户ID)划分到不同的数据流,并在这些不同的流上进行CEP的模式匹配分 析。例如,源数据流中包含字段id,如果希望根据id来划分流,并进行模式分析,那么就需要对源数据 流做如下处理:

CREATE STREAM derived_stream
AS SELECT * FROM raw DISTRIBUTE BY id;
-- 对由id划分的每个流进行模式分析
INSERT INTO ret SELECT e1.* FROM PATTERN(e1=derived_stream[e1.cnt > 0]) WITHIN ('1' minute);

高可用性

  1. 微批模式的流处理高可用性

• 开启CheckPoint
1. 通过Transwarp Manager管理界面修改InceptorServer配置,添加”stream.driver.checkpoint.dir” 配置项,并将其设置为HDFS上的某个目录。配置完毕后,重启Inceptor。
2. 创建Application时需要指定Application的CheckPoint目录,设置为HDFS上的某个目录。

        CREATE APPLICATION app1 WITH APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/")

• 启动Standby InceptorServer
通过Transwarp Manager管理界面配置额外的InceptorServer,设置它的role为Standby。设置完毕后将其启动。


事件驱动模式下的高可用性

(Slipstream事件驱动模式下的高可用是在Slipstream流任务级别,即当一个流任务异常退出,或者 是Slipstream Server异常终止恢复后,流任务的计算结果还能够保证与正常情况下的计算结果一致。该过程 对于用户是透明的,用户一般只关心流任务的计算结果。

两种模式(standalone/zookeeper)
Slipstream当前支持两种模式的高可用性的配置,其中Standalone模式能够保证计算结果At-Least-Once,而 要想保证Exactly-once,则需要使用Zookeeper模式。Slipstream流任务高可用性必须在事件驱动模式下,即 任务中要设置streamsql.use.eventmode=true。使用Slipstream HA时必须通过使用创建任务(CREATE STREAMJOB)的方式来定义流任务,所对应的任务级别的参数则是指定在jobproperties属性中。

Standalone 模式
Standalone是开启Slipstream HA的默认模式。在该模式下,任务的元信息保存在Driver端的内存中,因此该 模式只能支持在Driver不重启的前提下流任务的HA,当Driver重启时,对应的信息会丢失。在Standalone模 式下只能保证数据At-Least-Once的语义而无法保证Exactly-Once语义。
Standalone模式下需要在Transwarp Manager上配置以下参数:

1. spark.morphling.recovery.mode
HA的模式
2. spark.morphling.taskstate.backend
Task状态信息的存储系统
3. spark.morphling.taskstate.checkpoint.directory
HDFS上保存的任务状态信息

默认情况下Slipstream为用户生成这些参数的默认值,可以通过Transwarp Manager界面的设置界面查看,因 此默认情况下无需做其他的配置即可启动集群,如果希望修改HDFS的目录,则需要在修改这些配置以后重 启Slipstream。

例 17. Standalone模式下HA的使用示例
//1. 定义输入流
        create stream plainetl_checkpoint(classid string, content string)
        tblproperties("topic"="plainetlha","kafka.zookeeper"="NodeA:2188","kafka.broker.list"="Node
        A:9098");
        create table plainTable(classid string, name string);
//2. 创建流任务
        create streamjob etl1 as ("insert into plainTable select * from plainetl_checkpoint")
        jobproperties(
"streamsql.use.eventmode"="true", "morphling.job.checkpoint.interval"="5000", morphling.job.enable.checkpoint=true, --定义该任务是否启用HA morphling.task.max.failures=3); --任务失败重试次数
//3. 启动流任务
        start streamjob etl1;
//现在已经成功的启动了一个名为etl1的流任务,若在任务运行的过程中出现异常例如任务被异常kill, 该任务会重新调度上线,重试次数为3。在Standalone模式下,一旦任务出现重新调度的情况,用户可 能观察到计算结果的值出现重复,这是由于Standalone模式下的HA只能保证数据至少被输出一次,而无 法保证数据不重复输出。如果用户希望得到更精确的计算结果,Slipstream推荐用户使用Zookeeper模 式。

Zookeeper 模式
Zookeeper模式是Slipstream所推荐用户使用的模式,该模式下的任务元信息保存在Zookeeper上,此外该模 式下还会保存某个任务每次已经完成的checkpoint信息到HDFS上。这样就保证即使整个Slipstream集群发生 异常退出重启恢复的时候,流任务还能保证计算的准确性。Zookeeper模式下的流任务可以保证数据Exactly- once的语义。
WAL-sink是一个可以保证数据结果只被写一次的功能, 通过打开WAL-sink功能使得流任务即使发生异常退 出,自动重启提交后,数据结果也不会出现重复,即Exactly-Once。要使用WAL-sink需要外接数据库的配 置,当前支持的数据库有VoltDB以及MySQL。Slipstream默认配置的数据库为MySQL。
Zookeeper模式需要用户在Transwarp Manager界面上配置spark.morphling.recovery.mode为zookeeper,并指定在HDFS上存储完整checkpoint信息的目录。Slipstream默认为用户生成一个目录。可以通过Transwarp Manager界面的配置项查看该目录的值。
注意:Zookeeper上的目录以及HDFS的目录为Slipstream启动时生成的,若不慎删除可能导致开启checkpoint 的流任务无法正常启动,此时需要手动创建该目录并付给响应的写权限。

例 18. Zookeeper模式下HA的使用示例
该例将数据写入到Hyperbase表中:
// 1. 创建输入流
        create stream raw(classid string, content string)
        tblproperties("topic"="raw_t1","kafka.zookeeper"="NodeA:2188","kafka.broker.list"="NodeA:90
        98");
//2. 创建Hyperbase外部表
create external table hbase_ret(k string, v string)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping"=":key,content:value") tblproperties("hbase.table.name"="haret"); -- haret 需在{hyperbase}中事先创建
//3. 创建流任务
create streamjob zkmode as ("insert into hbase_ret select * from joinA") jobproperties(streamsql.use.eventmode=true,
morphling.task.max.failures=5,
 --任务失败重试次数 "morphling.job.checkpoint.interval"="5000", "morphling.job.enable.checkpoint"="true", "morphling.job.enable.wal.sink"="true", 
--开启WAL Sink功能 "morphling.wal.sink.committer.type"="mysql",
 --开启WAL Sink的meta信息存储方式 "morphling.wal.sink.voltdb.jdbc.url"="jdbc:mysql://localhost/hainfo"); --WAL
//Sink所配置的数据库的JDBC URL 4. 启动流任务
        start streamjob zkmode;
//至此已经完成启动一个Zookeeper模式下支持HA的流任务,在Slipstream集群状态正常的情况下,如果 任务遇到异常退出恢复后,可以保证数据计算的完整准确,就好像任务并没有发生异常一样。 当Slipstream集群出现异常需要重启,在重启之后需要用户重新提交这些流任务,一旦用户提交流任务 后,任务会从上一次失败的地方重新开始计算出正确的结果。再次说明的是,Slipstream的HA的功能指 的的任务级别的高可用性,即保证一个流任务的高可用,而非指的是一个Slipstream集群级别的高可 用。

相关参数配置

Slipstream HA的配置参数分为server级别和任务级别,其中server级别的参数应该在启动Slipstream Server时配置好,任务级别的参数是在创建流任务的时候指定给流任务属性的参数。server级别的参数 中,Zookeeper模式对应的参数只需要在使用Zookeeper模式的时候配置即可,其他参数都是必须配置的参 数。

At-Least-Once支持

在开启CheckPoint之后,用户只需要在Application中设置Kafka的WAL,即可实现At-Least-Once。

  ALTER APPLICATION app1 SET APPPROPERTIES("application.enable.wal"="true")

自定义参数设置

  • Batch duration

Slipstream处理每个批次数据的间隔时间,通过以下方式设置:
系统时间模式

 CREATE APPLICATION app1 WITH APPPROPERTIES("stream.batch.duration.ms"="2000")
//事件时间模式下可用 TBLPROPERTIES 中的 "batchduration" 对每个stream设置。
  • Kafka receiver个数
Kafka receiver个数的多少会影响接收数据的并发度,通过以下方式设置在StreamJob level:
  ALTER STREAMJOB s1 SET JOBPROPERTIES("stream.number.receivers"="4")
  • Holodesk窗口设置

Holodesk是星环特有的基于内存的列式存储表,专用于高速数据分析业务。Slipstream支持往Holodesk的数 据插入,由于数据持续不断的插入可能会将内存耗尽,可在创建Holodesk表时设置以下参数调节Holodesk的 最大窗口大小:

 CREATE TABLE holo(id INT, name STRING) STORED AS HOLODESK TBLPROPERTIES("holodesk.window.length"
  ="60000","holodesk.window.timeout"="10000");
//"holodesk.window.length"用来限制最大窗口长度。"holodesk.window.timeout"是最大超时。两个的单位都 是毫秒。例如,假设将"holodesk.window.length"设置为60000, holodesk.window.timeout设置为10000。 那么holedesk创建后,向其插入数据时,会保存60秒(窗口长度)内的数据,从第61秒开始,以后的数据被 标记为删除,在第71秒时被真正删除(超时长度为10秒)。

开启流上的PLSQL

  SET stream.enabled= true;
  SET stream.tables=default.abc,default.efg;
用户需要显式地打开Stream,才能运行流上的Slipstream。此外,在当前版本中,用户需要显式地指定哪些 表是作为Stream存在。
开启PLSQL的编译优化
SET plsql.optimize.dml.precompile=true; PLSQL解析执行,因此需要通过这个参数将编译缓存,可大大提升执行效率。

嵌套SELECT需要加 Ad-hoc的hint

这是因为在Slipstream中禁用了普通的SELECT,在PLSQL中这些SELECT可能嵌套于游标中,在当前版本中如果 不加Ad-hoc可能出现误判。
简单函数示例


  set stream.enabled=true;
  set stream.tables=testa;
  create stream testa(id string, a string) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
  create table testb(id string, a string);
  create or replace procedure simple_test()
  is
  begin
    insert into testb select * from testa
  end;
begin
    simple_test()
  end;

游标示例

set stream.enabled=true;
  set stream.tables=t1;
  create stream t1(id int, value int) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
  create table t2(id int,value int);
  declare
   v_id int
   v_value int
   cursor cur(cur_arg int)
is
select * from (select/*+ adhoc*/ * from t1 where id < cur_arg) order by id
  begin
   open cur(5)
   loop
   fetch cur into v_id, v_value
   exit when cur%notfound
   end loop
end;
 类似资料: