Slipstream的优势:
微批模式和事件驱动模式的(创建Stream的方式和简表语句基本相同,随心所欲)
* 一体化极高的易用性(低门槛,只要会SQL就可以)
* 性能提升(无需编码)
* 产品化程度高(封装程度高)
* 迁移成本低(基本不需迁移,Stream里面的数据可以直接通过查询insert到另一张表中)
创建Stream及触发StreamJob的形式:
1.首先登入集群中的任意一个节点:
1. beeline -u "jdbc:hive2://localhost:<port>/default" -n hive -p 123
2.像建表一样的创建Stream:
1. CREATE STREAM demo_stream(id INT, letter STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','TBLPROPERTIES("topic"="demo","kafka.zookeeper"="172.16.1.128:2181","kafka.broker.list"="172.16.1.128:9092”);
show Stream;
4.创建并触发一个StreamJob;
a. 建一张新表demo_table,它需要和demo_stream有相同的schema:
CREATE TABLE demo_table(id INT, letter STRING);
b. 向demo_table插入demo_stream中的数据,insert这个操作会触发StreamJob的执行:
INSERT INTO demo_table SELECT * FROM demo_stream;
5.查看运行的StreamJOb
LIST STREAMJPOBS;
Exp:输出中包含streamid,触发StreamJob的sql和status。
6.StreamJOb的默认端口是4040
7.注意stream触发是在执行insert命令之后的,此时才开始接收Kafka 的数据流。
8.停止流:
STOP STREAMJOB ${streamid};
Slipstream框架机制:
Slipstream有三个核心的概念:Stream、StreamJob 和 Application。
Exp:Stream是数据流,StreamJob是对一个或多个Stream进行计算并将结果写进一张表的任务,Application是一个或多个StreamJob的集合。
1:转化过的Stream一般用View来表示,称之为DerivedStream。View这个概念可以跟DStream的lazy evaluation很好地映射上,也就是说定义好的DML不是马上执行,而是在MetaStore里记录了DML的转化,直到遇到action才会触发执行。
2:View的概念来描述Stream转化的过程,用户在真正使用的过程中我们还是建议采用CSAS(CREATE STREAM AS SELECT),而避免使用CVAS(CREATE VIEW AS SELECT),以避免跟普通表的混淆。
3:在触发机制之前的所有的Stream都是可以重用的。(就是说一个数据流的数据流可以供给两个APP里面的工作需求)。
4:Application只是一个逻辑概念,但是每个App有自己的权限和资源配置(资源共享和资源隔离机制),确保每个App之间的运行互不干扰。
5:Slipstream中的sql都是后台运行的,停止Stream的运行需要手动Stop的。
Stream的类型分为两种:Input Stream 和 Derived Stream。
Input Stream:直接接收的数据源
Derived Stream:现有的stream转化的Stream
StreamJob的工作机制:
Datasource->Receiver->Stream-(trasform)->DerivedStream->Action
Application的工作机制:
• CREATE STREAM:创建一个Input Stream
• CREATE STREAM AS SELECT:创建一个Derived Stream• SHOW STREAMS:列出所有的Stream
• DESCRIBE STREAM:描述Stream
• ALTER STREAM:修改Stream
• DROP STREAM:删除Stream
Create Stream:
CREATE STREAM <stream_name>(<column> <data_type>, <column> <data_type>, ...)
[ROW FORMAT DELIMITED FIELDS TERMINATED BY '<delimiter>' COLLECTION ITEMS TERMINATED BY'<delimiter2>' MAP KEYS TERMINATED BY '<delimiter3>'] 1
TBLPROPERTIES(
["topic"="<topic_name>",] 2
["source"="kafka",] 3
["kafka.zookeeper"="<ip:port>",] 4["kafka.broker.list"="<ip:port>",] 5
["partitions"="<int>,<int>, ...",] 6["transwarp.consumer.security.protocol"="<protocol>",] 7["transwarp.consumer.sasl.kerberos.service.name"="service_name",] 7["transwarp.consumer.sasl.jaas.config"="jass_string",] 7["batchduration"="<int>",] 8
["<key>"="<value>",] 9["transwarp.consumer.<property_key>"="kafka_consumer_property_value",] 10["transwarp.producer.<property_key>"="kafka_producer_property_value",]
…);
1 指定数据字段间的分隔符,对于简单类型默认是“,”。
2 指定input stream指向的Kafka topic,默认值是database_name.table_name。
3 指定数据源来源,默认为kafka。
4 指定TDH集群中的一个Zookeeper节点的ip和端口号,除非另外设置,Zookeeper的端口号为2181,默认为当前集群的ZK的地址和端口号。建议在创建Stream时设置此项。
5 指定集群中的任意一个Kafka broker的ip和端口号,默认为9092。
6可选项。从Topic中指定的partition上获取流数据。注意,指定 partitions 属性时,需要同时指定kafka.broker.list 属性。
7 这三个选项在安全模式下以Kafka为源时需要使用,细节参考安全模式。8整数,设置batch的长度,单位为毫秒。该属性只能在 事件时间模式 下使用,且只对 微批模式的流处理
适用。
9 TBLPROPERTIES 中还可以包含任意自定义的键值对用于存放用户自定义的流属性。
10 指定StreamJob启动Kafka consumer使用的属性值。property_key为Kafka consumer所支持的某个配置属性名,例如,想配置consumer能接收的消息的大小,可指定”transwarp.consumer.fetch.max.bytes”=”1000000”。
当stream作为输出流时,可通过此配置项来设定StreamJob启动Kafka producer时使用的属性值。property_key为Kafka producer所支持的某个配置属性名,例如,想配置producer发送消息的ack模式,可指定”transwarp.producer.acks”=”0”。
接收基于kafka的数据源有两种方式:(非安全模式和安全模式两种情况)
非安全模式:
例 1. 建Kafka为源的Stream
CREATE STREAM employee_stream (id INT, name STRING, sex BOOLEAN) ROW FORMAT DELIMITED FIELDS
TERMINATED BY "," TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181","kafka.broker.list"
="node127:9092");
如果当前所在的数据库名字为“database”,则默认以Kafka名为“database.employee_stream”的topic为源。数据在Kafka中存储是以“,”隔开的三个字段,分别是INT, STRING和BOOLEAN类型。
例 2. 以指定的Kafka Topic为源建Stream
CREATE STREAM demo(id INT, letter STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="demo", "kafka.zookeeper"="172.16.1.128:2181","kafka.broker.list"
="172.16.1.128:9092");
例 3. 从指定的Partition中接收数据
CREATE STREAM demo(n STRING, v INT) TBLPROPERTIES("topic"="test","partitions"="0,3"
,"kafka.broker.list"="localhost:9092","kafka.zookeeper"="localhost:2181");
从名为test的topic的第0和第3个partition接收数据
安全模式(权限控制):
安全模式下,一个stream要成功读取/写入一个topic中的消息必须满足下面两个条件:
• 每个运行Executor的服务器上都必须有建stream用户的keytab文件;
• 建stream的用户有该topic的读/写权限。
所以,操作用户必须进行如下设置:
1. 接收数据的流
• 手动在每个运行Executor的服务器上的 相同目录 下放置一份自己的keytab文件,然后,设置属性transwarp.consumer.security.protocol 值为 SASL_PLAINTEXT ,设置属性transwarp.consumer.sasl.kerberos.service.name 为Kafka broker上配置了的相应的值,设置属性transwarp.consumer.sasl.jaas.config 为相应的jaas字符串。
• 确保自己对作为源的Kafka topic有读权限。
例 4. 安全模式下以Kafka为源建Stream(须人工保证每个节点上都有keytab文件)
CREATE STREAM demo(id INT, letter STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="demo",
"kafka.zookeeper"="172.16.1.128:2181",
"kafka.broker.list"="172.16.1.128:9092"
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka"
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true storeKey=true keyTab=\"/tmp/keytab/kafka.keytab\"
principal=\"kafka/stream371@TDH\"
);
CREATE STREAM output(id INT, letter STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="output",
"kafka.zookeeper"="172.16.1.128:2181",
"kafka.broker.list"="172.16.1.128:9092"
"transwarp.producer.security.protocol"="SASL_PLAINTEXT",
"transwarp.producer.sasl.kerberos.service.name"="kafka"
"transwarp.producer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule
required
useKeyTab=true storeKey=true keyTab=\"/tmp/keytab/kafka.keytab\"
principal=\"kafka/stream371@TDH\"
);
3.接收Socket为源的数据
CREATE STREAM socket_stream (id INT, name STRING, sex BOOLEAN) TBLPROPERTIES("source"="socket"
,"host"="172.16.1.182","port"="8888");
创建Socket为数据源的表,数据源是172.16.1.82,端口号为8888。
SQLStream的复杂数据类型简介:
复杂类型STRUCT
CREATE STREAM struct_stream (id INT, s1 STRUCT<a:INT, b:STRING>) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY '|' TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
字段s1为STRUCT类型。Kafka中数据格式如下
1,5|five
2,9|nine
复杂类型ARRAY
CREATE STREAM array_stream (id INT, a1 ARRAY<STRING>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|' TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
字段a1为ARRAY类型。Kafka中数据格式如下
1,1|2|3|4
2,2|4|6|8
复杂类型MAP
CREATE STREAM map_stream (id INT, m1 MAP<STRING,INT>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|' MAP KEYS TERMINATED BY ':' TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
字段m1为MAP类型。Kafka中数据格式如下
1,job:800|team:60
2,job:60|team:80
复杂类型TIMESTAMP
CREATE STREAM student_stream (id INT, t1 TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
字段t1为TIMESTAMP类型。默认格式为yyyy-MM-dd HH:mm:ssKafka中数据格式如下
1,2015-01-15 00:16:10
2,2077-11-15 23:16:10
自定义用户数据解析
ADD JAR /tmp/decoder.jar
CREATE STREAM employee_stream(id INT, name STRING, sex BOOLEAN) TBLPROPERTIES("kafka.decoder"
="io.transwarp.kafka.ExampleDecoder", "kafka.zookeeper"="tw-node127:2181");
上面示例表示导入用java实现对应接口的jar包,然后指定数据解析方式。当前支持的自定义数据解析方式包括解压、多行数据放入同一行,可以继承 io.transwarp.streaming.sql.api.decoder.ArrayDecoder 实现arrayFromBytes来实现自定义逻辑,包括数据清理。
CREATE STREAM AS SELECT
FILTER转化
CREATE STREAM female_name_stream AS SELECT name FROM employee_stream WHERE sex==true;
WINDOW转化
CREATE STREAM employee_window_stream AS SELECT * FROM employee_stream STREAMWINDOW w1 AS(length '12'
second slide '4' second);
SHOW STREAM
显示所有的Streams,包括Input Stream和Derived Stream
语法
show Streams;
DESCRIBE STREAM
显示Stream的信息
语法
DESCRIBE|DESC [EXTENDED|FORMATTED] <stream_name>|<view_name> 1
1 DESCRIBE可以简写为DESC;[EXTENDED|FORMATTED]为可选项,选择它们可以输出比单独DESCRIBE更多的信
息。
ALTER STREAM
ALTER STREAM用于修改Input Stream。
重命名Input Stream
ALTER STREAM <old_name> RENAME TO <new_name>;
修改Input Stream属性
ALTER STREAM SET TBLPROPERTIES (“key”=”value”);
使用该指令可以修改任意流属性,包括指向的topic、使用的Zookeeper节点、broker list以及其它自定义的流属性。
增加Input Stream列
ALTER STREAM <stream_name> ADD COLUMNS(name type[, name1 type1 ...]);
通过该指令增加一到多列。
替换Input Stream列
ALTER STREAM <stream_name> REPLACE COLUMNS(name type[, name1 type1 ...]);
通过该指令替换一到多列。
DROP STREAM
删除一个Input Stream(也就是以CREATE STREAM创建的Stream)语法
DROP STREAM <stream_name>;
Application管理
Application在静态时只是一个逻辑概念,主要用于运行时的隔离和权限验证,所以一般只用于一些参数配置。一般来说,在测试时用户用自己的用户名登录Shell(比如Beeline),在自己建的Application下进行业务逻辑编写。而当业务需要上线时,可以用一个prod id,具有严格的权限控制,并对应一个Application。将自己的SQL脚本在对应的Application下启动就可以。
1. CREATE APPLICATION:创建一个application
CREATE APPLICATION <app_name> WITH APPPROPERTIES(["key"="value"][,"key"="value"...]);
DESCRIBE [|DESC] APPLICATION <app_name>;
SHOW CURRENT APPLICATION;
ALTER APPLICATION <app_name> SET APPPROPERTIES("key"="value");
SHOW APPLICATIONS;
DROP APPLICATION <app-name>;
StreamJob管理
StreamJob是触发Slipstream开始执行的 Action,一般具有插入结果表语义,或者Ad-hoc查询语
义。StreamJob主要存储StreamJob level的参数配置,以及对应SQL。
1. CREATE STREAMJOB:创建一个StreamJob
CREATE STREAMJOB <streamjob_name> as ("<query>") JOBPROPERTIES(["key"="value"][,"key"="value"...]);
DESCRIBE [|DESC] STREAMJOB <streamjob_name>;
ALTER STREAMJOB <streamjob_name> SET APPPROPERTIES("key"="value");
SHOW STREAMJOBS;
DROP STREAMJOB <streamjob-name>;
这里介绍几个Slipstream特有的概念:窗口、输出方式、查询中的流关联和流/表关联。
1.基于流的窗口函数分为两种:
2.基于窗口切分的方式又分为两种:
滑动窗口介绍:(滑动窗口以 LENGTH 指定窗口长度,SLIDE 指定滑动间隔。)
1.系统时间切分滑动窗口
//以系统时间切分,窗口大小2s (LENGTH '2' SECOND),滑动间隔为1s(SLIDE '1' SECOND)。
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
INSERT INTO t1 SELECT * FROM s1
STREAMWINDOW w1 AS (LENGTH '2' SECOND SLIDE '1' SECOND);
2.事件时间切分滑动窗口
//-- 用ts字段切分时间,使用 SEPARATED BY <field> 来指定切分时间的字段。
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw- node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY ts LENGTH '2' SECOND SLIDE '1' SECOND);
3.事件时间切分无限滑动窗口
//用ts字段切分时间,窗口长度为无穷大(LENGTH INFINITE)。
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY ts LENGTH INFINITE SLIDE '1'
SECOND);
跳动窗口介绍
(跳动窗口以 INTERVAL 指定窗口长度以及窗口间隔。)
1.系统时间切分跳动窗口
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (INTERVAL '2' SECOND);
创建一个流之后,以2s为间隔(INTERVAL '2' SECOND)的跳动时间窗口对它进行查询,窗口切分的依据 是系统时间。
2.事件时间切分
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY ts INTERVAL '2' SECOND);
创建一个流之后,以2s为间隔的跳动时间窗口对它进行查询,窗口切分的依据是s1中的字段ts。
事件时间切分的其他设置
版本信息
本章介绍的 SEPARATED BY 关键字是TDH4.6开始的新功能,在TDH4.5.x版 本中,事件时间切分采用下面的方式:
TDH4.5.x中的事件时间切分
SET streamsql.use.eventtime=true; –打开事件时间切分的开关
CREATE STREAM s12(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("timefield" ="ts", "timeformat"="yyyy-MM-dd HH:mm:ss", "kafka.zookeeper"="tw-node127:2181"); 1
INSERT INTO t1 SELECT * FROM s12 STREAMWINDOW w1 AS(LENGTH '2' SECOND SLIDE '1' SECOND); 2
1 在Input Stream中设置好用于切分窗口的字段以及时间格式
2 Windowed Stream的窗口切分依据为上面Input Stream中设置好的字段
TDH4.6开始,使用 SEPARATED BY 关键字设置事件时间切分则无须再手动 打开 streamsql.use.eventtime 开关。同时Input Stream的 TBLPROPERTIES 中的 timefield 和 timeformat 两个参数都为可选,若显式指明了,SEPARATED BY 关键词以 及 FORMAT 关键词所指定的相应属性将具有更高优先级:
TDH4.6.x中的事件时间切分
CREATE STREAM s12(id INT, name STRING, ts1 TIMESTAMP, ts2 TIMESTAMP) TBLPROPERTIES("timefield"="ts1", "timeformat"="yyyy-MM-dd HH:mm:ss", "kafka.zookeeper"="tw-node127:2181"); 1
INSERT INTO t1 SELECT * FROM s12 STREAMWINDOW w1 AS(SEPARATED BY ts2 LENGTH '2' MINUTE SLIDE '1' MINUTE FORMAT "yyyy-MM-dd HH:mm"); 2
//1 timefield 和 timeformat 都为可选参数。
2 虽然Input Stream定义中将 timefield 设为 ts1,将 timeformat 设为 yyyy-MM-dd HH:mm:ss,但是这里的窗口切分方式为按 ts2 字段切分,时间格式为 yyyy-MM-dd HH:mm,因为这里的 SEPARATED BY 和 FORMAT 优先级更高。
设置事件时间切分的默认字段
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("timefield"="ts", "kafka.zookeeper"="tw-node127:2181"); 1
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY DEFAULT LENGTH '2' SECOND SLIDE '1' SECOND); 2
//1 将ts字段设置为事件时间切分的默认字段。 2在为s1切分窗口时即可以使用 DEFAULT 来指代 ts 作为时间切分字段
自定义窗口的时间格式
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("timefield"="ts", "kafka.zookeeper"="tw-node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY DEFAULT LENGTH '2' MINUTE SLIDE '1' MINUTE FORMAT "yyyy-MM-dd HH:mm"); 1
//1以DEFAULT的字段为切分依据,窗口大小为2分钟,滑动间隔为1分钟,指定格式为 "yyyy-MM-dd HH:mm",在指定格式中,时间不包含秒级信息(默认情况下时间格式则是包含秒级信息:"yyyy-MM- dd HH:mm:ss")。
设置流的默认时间格式
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("timefield"="ts","timeformat" ="yyyy-MM-dd HH:mm", "kafka.zookeeper"="tw-node127:2181"); 1
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY DEFAULT LENGTH '2' MINUTE SLIDE '1' MINUTE);
//1 在创建Stream时将默认的时间格式设置为 "yyyy-MM-dd HH:mm",而不是默认的 "yyyy-MM-dd HH:mm:ss"。
设置事件时间的零点
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP)
TBLPROPERTIES("timefield"="ts", "timeformat"="yyyy-MM-dd HH:mm", "streamsql.eventtime.zerotime"
="<unixtimestamp>", "kafka.zookeeper"="tw-node127:2181"); 1
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY DEFAULT LENGTH '2' MINUTE SLIDE '1' MINUTE);
//1使用 stremsql.eventtime.zerotime 参数设置时间零点,时间零点的值必须为一个unixtimestamp。该语法和 下面的 START TIME AT 效果相同,但是需要将DATE类型的时间转换为LONG类型的unixtimestamp。我 们建议除非必要,使用下面的方式。注意: 如果 streamsql.use.eventtime 开关没有打开,并且不使用 SEPARATED BY 指定事件时间字段,不可以设置时间零点;当Slipstream运行在 事件驱动的模式 时, 该设置不适用。 .设置窗口从某一指定时间开始
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("timefield"="ts" ,"timeformat"="yyyy-MM-dd HH:mm", "kafka.zookeeper"="tw-node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY DEFAULT LENGTH '2' MINUTE SLIDE '1' MINUTE START TIME AT "2016-04-01 10:00"); 1
//1 使用 START TIME AT 设置窗口开始时间为一个固定的时间,那么后续的窗口将按照这个起始点切分。 请注意该时间必须跟 FORMAT,或者 "timeformat" 指定的时间格式保持一致。注: 当Slipstream运行 在 事件驱动的模式 时,该设置无效。
STRING字段作为切分依据
CREATE STREAM s1(id INT, name STRING, ts1 STRING) TBLPROPERTIES("timefield"="ts1","timeformat"
="MM/dd/yyyy HH:mm:ss", "kafka.zookeeper"="tw-node127:2181");
设置事件时间乱序情况下可容忍的最大延迟
SET streamsql.eventtime.maxlag.on.disorder=<minutes>; 1
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw- node127:2181");
INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (SEPARATED BY ts LENGTH '2' SECOND SLIDE '1' SECOND);
//1将 <minutes> 设置为一个正整数,单位为分钟。该参数用于设置对乱序的容忍度,也就是可以接受 <minutes> 内的消息乱序。收到某个事件时间为 et 的消息后,Slipstream会认为事件时间为 et- <minutes> 及之前的事件都已经发生过,所以会直接丢弃事件时间小于等于 et-<minutes> 的消息。例如 将 <minutes> 设为2,那么当receiver收到包含事件时间为 2016:03:18 19:10:01 的消息时,即认为 2016:03:18 19:08:01 以及更早的消息都已接收完成,(往后如果收到这种消息,将直接丢弃)。默认情况 下,该参数值为负,即认为事件时间完全有序。注: Slipstream运行在 事件驱动的模式 时,暂不具备乱 序处理功能,该设置无效。
Window Stream的聚合操作
(Slipstream支持对window stream的聚合操作,我们建议先建window stream再做聚合。)
聚合操作
CREATE STREAM s1(id INT, value INT, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw- node127:2181");
CREATE STREAM ws1 AS SELECT * FROM s1 STREAMWINDOW w1 AS (LENGTH '2' SECOND SLIDE '1' SECOND) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181"); --先建window stream
INSERT INTO t1 SELECT id, SUM(value) FROM ws1 GROUP BY id; -- 然后进行聚合操作
流之间的关联
(Slipstream支持window stream之间,window stream和表之间进行关联操作。)
流之间的关联操作
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
CREATE STREAM s2(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-
node127:2181");
CREATE STREAM s1_join as SELECT * FROM s1 STREAMWINDOW w1 AS
(SEPARATED BY ts LENGTH '2' SECOND SLIDE '1' SECOND);
CREATE STREAM s2_join as SELECT * FROM s1 STREAMWINDOW w1 AS
(SEPARATED BY ts LENGTH '2' SECOND SLIDE '1' SECOND);
INSERT INTO t1 SELECT s1_join.name, s2_join.name FROM s1_join JOIN s2_join ON s1_join.id ==
s2_join.id;
--以ts为切分依据,再对窗口内的两个流的数据进行关联操作。
--不同字段不同时间格式的流之间的关联
CREATE STREAM s1(id INT, name STRING, ts1 TIMESTAMP) TBLPROPERTIES("timefield"="ts1","timeformat"
="yyyy-MM-dd HH:mm:ss", "kafka.zookeeper"="tw-node127:2181");
CREATE STREAM s2(id INT, name STRING, ts2 TIMESTAMP) TBLPROPERTIES("timefield"="ts2","timeformat"
="yyyy/MM/dd HH:mm:ss", "kafka.zookeeper"="tw-node127:2181");
CREATE STREAM s1_join AS SELECT * FROM s1 STREAMWINDOW w1
AS(SEPARATED BY DEFAULT LENGTH '2' SECOND SLIDE '1' SECOND);
CREATE STREAM s2_join AS SELECT * FROM s2 STREAMWINDOW w1
AS(SEPARATED BY DEFAULT LENGTH '2' SECOND SLIDE '1' SECOND);
INSERT INTO s1 SELECT s1_join.name, s2_join.name FROM s1_join JOIN s2_join ON s1_join.id ==
s2_join.id;
流与普通表关联
CREATE STREAM s1(id INT, name STRING, ts STRING) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
CREATE STREAM s1_join as SELECT * FROM s1 STREAMWINDOW w1 AS(SEPARATED BY ts LENGTH '2' SECOND SLIDE
'1' SECOND);
CREATE TABLE t2(id INT, name STRING) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
INSERT INTO t1 SELECT id, s1_join.name, t2.name FROM s1_join JOIN t2 on s1_join.id == t2.id;
(三种方式:输出到表,输出到流,输出到HDFS)
ssh root@localhost "mysql -u root -e 'use dblink;drop table if exists dbt1;create table dbt1 (d1
tinyint, d2 smallint);'"
CREATE DATABASE LINK link1 CONNECT TO hiveuser IDENTIFIED BY 'password' USING
'jdbc:mysql://localhost:3306/dblink';
CREATE STREAM s1(d1 TINYINT, d2 SMALLINT) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
INSERT INTO dbt1@link1 SELECT * FROM s1;
//示例首先在MYSQL中创建表dblink.dbt1,然后在Inceptor中创建DBLink link1,最后将流中数据 输出到MYSQL的表dblink.dbt1。 输出到DB2和Oracle通过类似的方式。
CREATE STREAM stream_in (id INT, name STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="topic_in",
"kafka.zookeeper"="tw-node127:2181,tw-node128:2181,tw-node129:2181","kafka.broker.list"="tw-
node127:9092,tw-node128:9092,tw-node129:9092");
CREATE STREAM stream_out (id INT, name STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="topic_out",
"kafka.zookeeper"="tw-node127:2181,tw-node128:2181,tw-node129:2181"
"kafka.broker.list"="tw-node127:9092,tw-node128:9092,tw-node129:9092");
INSERT INTO stream_out SELECT * FROM stream_in;
--注意 ,在安全模式下输出到流时请按照安全模式对Stream进行配置。
SET streamsql.enable.hdfs.batchflush = true --打开批量flush开关
SET streamsql.hdfs.batchflush.size = <num> --设置一次flush的消息个数,消息量达到该参数时flush一次
SET streamsql.hdfs.batchflush.interval.ms = <num> --设置每过多长时间(单位为毫秒)flush一次
只需满足 batchflush.size 和 batchflush.interval.ms 其中的一个条件即会触发一次flush。
基于流的查询SELECT
Ad-hoc查询
Slipstream允许用户在正在运行的流中做抽样查询,也就是说Ad-hoc查询是针对当前时刻。由于无法确定 “当前”时刻引擎的处理状态,Ad-hoc查询只是将正在被处理的流数据获取到进行加工。
Ad-hoc查询允许用户进行交互式操作,使得验证数据质量,或者业务逻辑迭代改进成为可能。
CREATE STREAM s1(id INT, name STRING, ts STRING) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
SELECT /+ADHOC/ * FROM s1;
//Ad-hoc查询 会立即返回查询结果给当前Session所在窗口,一般情况下的Slipstream会要求将结果存入某 张结果表,Ad-hoc查询是Slipstream唯一的一种特殊情况。
流不支持的DML
限于流的特性,有些操作在流上将不被支持,主要包括 UPDATE 和 DELETE 操作。这是因为流数据是持续延伸向未来的数据流,窗口只会往前延伸,因此一般不会支持更新或者删除操作。