Slipstream
create application {application_user_name};
use application {application_user_name}
// 基于Kafka Topic创建输入流表logs
create stream logs (ip string, url string, time timestamp) row format delimited fields terminated by ',' tblproperties( "topic"="{topic_log_student_name}", "kafka.zookeeper"="{Zookeeper_List}", "kafka.broker.list"="{Broker_List}", "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/student.keytab\" principal=\"admin@TDH\"");
// 基于输入流表创建衍生流表wlog,窗口Length和Slide为10秒
create stream wlog as select * from logs streamwindow sw as (length '10' second slide '10' second);
// 查看所有的流表
show streams;
//创建结果表
use {database_name}
create table result_table(url string, count int);
// 创建StreamJob,log_job为 Streamjob名,格式为job_账号名,如账号student1则为job_student1
CREATE STREAMJOB log_job AS (
" insert into result_table select url, count(*) from wlog group by url ")JOBPROPERTIES("morphling.result.auto.flush"="true");
// 查看已创建的StreamJob
show streamjobs;
// 开启StreamJob
START STREAMJOB log_job;
list streamjobs;
//向Kafka Topic发送数据
27.0.1.125,www.transwarp.io/home.html,2012-8-14 20:12:31.132
54.231.66.16,www.transwarp.io/product.html,2012-8-14 20:12:32.132
72.21.203.5,www.transwarp.io/case.html,2012-8-14 20:12:33.132
207.241.224.2,www.transwarp.io/product.html,2016-8-14 20:12:34.132
12.129.206.133,www.transwarp.io/product.html,2016-8-14 20:12:51.132
208.111.148.7,www.transwarp.io/home.html,2016-8-14 20:12:52.132
//查看结果表中的流处理结果
select * from result_table;
#查看所有流任务
1. LIST STREAMJOBS;
#停止所有流任务
2. STOP STREAMJOBS;