当前位置: 首页 > 工具软件 > Slipstream > 使用案例 >

【TDH】Kafka、Flume、Slipstream基本操作

弘思聪
2023-12-01

【Kafka操作:在${KAFKA_HOME}/bin下执行Kafka操作】

1、在星环TDH集群上操作Kafka的时候首先要进行相关的赋权操作

(1)赋予当前用户(当前用户以hive为例,可以使用kinit进行用户的切换)操作集群的权限

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --cluster

2、创建topic

./kafka-broker-topics.sh --bootstrap-server node3:9092,node2:9092,node1:9092 --create --topic yxy --partitions 3 --replication-factor 3 --consumer.config ../config/consumer.properties

(1)赋予用户生产数据的权限

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --producer

(2)赋予用户消费数据的权限

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --consumer --group root

(3)查看该topic具有哪些权限

./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --list --topic yxy

(4)列出所有的topic

./kafka-topics.sh --zookeeper node3:2181,node2:2181,node1:2181 --list

3、开启生产者

./kafka-console-producer.sh --broker-list node3:9092,node2:9092,node1:9092 \
--topic yxy \
--producer.config ../config/producer.properties

4、开启消费者

./kafka-console-consumer.sh --bootstrap-server node3:9092,node2:9092,node1:9092 \
--topic yxy \
--consumer.config ../config/consumer.properties

5、Kafka删除topic操作

(1)在manager界面将delete.topic.enable配置成true,右上角配置服务后重启Kafka组件。(仅在第一次删除topic时操作)

(2)进入zookeeper里面删除/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/目录下相关topic文件夹

         1)kubectl get pod |grep zookeeper
         2)kubectl exec -it zookeeper-server-zookeeper1-3543915313-1dmn1 bash
         3)cd /usr/lib/zookeeper/bin/
         4)./zkCli.sh 
         ./zookeeper-shell.sh node3:2181,node2:2181,node1:2181 (等同于以上四步操作),进入zookeeper命令行之后找到/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/下对应的topic文件,删除即可,执行quit退出zookeeper命令行(如果执行的是上面的四步操作,还需要在退出zookeeper命令行之后执行exit退出登录的界面)。

(3)manager界面搜索kmq.log.dirs配置目录(目前集群该目录为/hadoop/kmq)
        1)进入/hadoop/kmq删除相关topic的目录

 

【Slipstream操作:创建stream流并接收数据】

1、登录Slipstream

beeline -u 'jdbc:hive2://node2:10010/default' -n name -p passsword

2、建流

CREATE STREAM demos(id INT, letter STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  TBLPROPERTIES("topic"="yxy",
  "kafka.zookeeper"="node3:2181",
  "kafka.broker.list"="node3:9092",
  "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  "transwarp.consumer.sasl.mechanism"="GSSAPI",
  "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka/node1@TDH\""
  );

3、建表(表字段及类型必须与流一致)

CREATE TABLE demot(id INT, letter STRING);

4、触发流

INSERT INTO demot SELECT * FROM demos;

5、列出正在运行的StreamJob

LIST STREAMJOBS;

6、停止某一个StreamJob

STOP STREAMJOB id;

 

【Flume操作:在${FLUME_HOME}下执行Flume操作】

1、启动Flume

nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/flume-topic-test.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &

 

 类似资料: