当前位置: 首页 > 工具软件 > Slipstream > 使用案例 >

Slipstream 实验

房子昂
2023-12-01

Slipstream

create application {application_user_name};

use application {application_user_name}

// 基于Kafka Topic创建输入流表logs

create stream logs (ip string, url string, time timestamp) row format delimited fields terminated by ',' tblproperties( "topic"="{topic_log_student_name}", "kafka.zookeeper"="{Zookeeper_List}", "kafka.broker.list"="{Broker_List}", "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",

"transwarp.consumer.sasl.kerberos.service.name"="kafka",

"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/student.keytab\" principal=\"admin@TDH\"");

// 基于输入流表创建衍生流表wlog,窗口Length和Slide为10秒

create stream wlog as select * from logs streamwindow sw as (length '10' second slide '10' second);

// 查看所有的流表

 show streams;

//创建结果表

 use {database_name}

 create table result_table(url string, count int);

// 创建StreamJob,log_job为 Streamjob名,格式为job_账号名,如账号student1则为job_student1

CREATE STREAMJOB log_job AS (

" insert into result_table select url, count(*) from wlog group by url ")JOBPROPERTIES("morphling.result.auto.flush"="true");

// 查看已创建的StreamJob

show streamjobs;

// 开启StreamJob

START STREAMJOB log_job;

list streamjobs;

//向Kafka Topic发送数据

27.0.1.125,www.transwarp.io/home.html,2012-8-14 20:12:31.132

54.231.66.16,www.transwarp.io/product.html,2012-8-14 20:12:32.132

72.21.203.5,www.transwarp.io/case.html,2012-8-14 20:12:33.132

207.241.224.2,www.transwarp.io/product.html,2016-8-14 20:12:34.132

12.129.206.133,www.transwarp.io/product.html,2016-8-14 20:12:51.132

208.111.148.7,www.transwarp.io/home.html,2016-8-14 20:12:52.132

//查看结果表中的流处理结果

 select * from result_table;

#查看所有流任务

1. LIST STREAMJOBS;

#停止所有流任务

2. STOP STREAMJOBS;

 类似资料: