【Kafka操作:在${KAFKA_HOME}/bin下执行Kafka操作】
1、在星环TDH集群上操作Kafka的时候首先要进行相关的赋权操作
(1)赋予当前用户(当前用户以hive为例,可以使用kinit进行用户的切换)操作集群的权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --cluster
2、创建topic
./kafka-broker-topics.sh --bootstrap-server node3:9092,node2:9092,node1:9092 --create --topic yxy --partitions 3 --replication-factor 3 --consumer.config ../config/consumer.properties
(1)赋予用户生产数据的权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --producer
(2)赋予用户消费数据的权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --add --allow-principal User:hive --topic yxy --consumer --group root
(3)查看该topic具有哪些权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=node3:2181,node2:2181,node1:2181 --list --topic yxy
(4)列出所有的topic
./kafka-topics.sh --zookeeper node3:2181,node2:2181,node1:2181 --list
3、开启生产者
./kafka-console-producer.sh --broker-list node3:9092,node2:9092,node1:9092 \
--topic yxy \
--producer.config ../config/producer.properties
4、开启消费者
./kafka-console-consumer.sh --bootstrap-server node3:9092,node2:9092,node1:9092 \
--topic yxy \
--consumer.config ../config/consumer.properties
5、Kafka删除topic操作
(1)在manager界面将delete.topic.enable配置成true,右上角配置服务后重启Kafka组件。(仅在第一次删除topic时操作)
(2)进入zookeeper里面删除/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/目录下相关topic文件夹
1)kubectl get pod |grep zookeeper
2)kubectl exec -it zookeeper-server-zookeeper1-3543915313-1dmn1 bash
3)cd /usr/lib/zookeeper/bin/
4)./zkCli.sh
./zookeeper-shell.sh node3:2181,node2:2181,node1:2181 (等同于以上四步操作),进入zookeeper命令行之后找到/brokers/topics 、/consumers/、/admin/delete_topics/、/config/topics/下对应的topic文件,删除即可,执行quit退出zookeeper命令行(如果执行的是上面的四步操作,还需要在退出zookeeper命令行之后执行exit退出登录的界面)。
(3)manager界面搜索kmq.log.dirs配置目录(目前集群该目录为/hadoop/kmq)
1)进入/hadoop/kmq删除相关topic的目录
【Slipstream操作:创建stream流并接收数据】
1、登录Slipstream
beeline -u 'jdbc:hive2://node2:10010/default' -n name -p passsword
2、建流
CREATE STREAM demos(id INT, letter STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="yxy",
"kafka.zookeeper"="node3:2181",
"kafka.broker.list"="node3:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.mechanism"="GSSAPI",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka/node1@TDH\""
);
3、建表(表字段及类型必须与流一致)
CREATE TABLE demot(id INT, letter STRING);
4、触发流
INSERT INTO demot SELECT * FROM demos;
5、列出正在运行的StreamJob
LIST STREAMJOBS;
6、停止某一个StreamJob
STOP STREAMJOB id;
【Flume操作:在${FLUME_HOME}下执行Flume操作】
1、启动Flume
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/flume-topic-test.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &