当前最新版本:1.6.1
wget http://mirrors.ustc.edu.cn/apache/kyuubi/kyuubi-1.6.1-incubating/apache-kyuubi-1.6.1-incubating-bin.tgz
解压缩到指定目录:
tar -zxvf apache-kyuubi-1.6.1-incubating-bin.tgz -C ~/softwares
准备环境:
cd $KYUUBI_HOME/conf
cp kyuubi-env.sh.template kyuubi-env.sh
cp kyuubi-defaults.conf.template kyuubi-defaults.conf
将kyuubi地址设置为localhost,如果不打开该注释,那么使用localhost是无法连接的,需要填写主机的ip地址
kyuubi.fronted.bind.host localhost
进入kyuubi配置文件目录
cd $KYUUBI_HOME/conf
编辑kyuubi-defaults.conf
,设置参数,设置flink作为执行引擎
kyuubi.engine.type FLINK_SQL
(这一步是可选的,因为可以在jdbc url中指定该配置动态改变引擎)
在kyuubi-env.sh
配置FLINK_HOME
和FLINK_HADOOP_CLASSPATH
,要求flink版本大于1.14,
FLINK_HADOOP_CLASSPATH
官方文档里没有说明要写,但是如果不写会无法连接使用flink engine,具体的版本号可自行修改为自己的hadoop平台版本号
export FLINK_HOME=/home/xcchen/software/flink/flink-1.15.3
export FLINK_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.3.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.3.0.jar
在kyuubi-env.sh
配置SPARK_HOME
,要求spark版本大于3.0.0
# 调整为自己的spark安装路径
export SPARK_HOME=/home/xcchen/softwares/spark/spark-3.1.1
cd $KYUUBI_HOME
bin/kyuubi start
为了测试方便,这里使用flink standalone集群运行任务
cd $FLINK_HOME && bin/start-cluster.sh
使用kyuubi自带的beeline客户端工具
cd $KYUUBI_HOME
bin/beeline -u 'jdbc:hive2://localhost:10009/' -n xcchen
如果你没有在配置文件中设置flink引擎
cd $KYUUBI_HOME
bin/beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL' -n xcchen
将下面三条sql一条条的执行
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE target (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
INSERT INTO target SELECT * FROM orders;
问题:如果提交的是流式任务,客户端会一直阻塞
解决:临时的解决办法是修改代码重新编译,查看这条issue:https://github.com/apache/kyuubi/issues/4446。后续随着kyuubi的迭代会解决这个问题
select * from orders;
CREATE TABLE orders_b (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '20'
);
select * from orders_b;
如果表是有界的,那么就可以打印出结果
如果表是无界的,那么会一直阻塞,等待任务结束才会打印结果
使用create catalog
和use catalog
语法进行使用,需要将hive连接器放在$FLINK_HOME/lib
目录下
无法实现默认加载hive metastore
kyuubi目前版本对于flink支持的不是很好,对于批作业的支持是可以的,但是对于流式作业是不太友好的。
缺点:
为了测试方便,这里使用spark standalone集群
cd $SPARK_HOME
sbin/start-all.sh
还需要开启hdfs
start-dfs.sh
使用kyuubi自带的beeline客户端工具
cd $KYUUBI_HOME
bin/beeline -u 'jdbc:hive2://localhost:10009/' -n xcchen
准备数据文件
echo 'id,name\n1,zhangsan\n2,lisi' > /tmp/spark-kyuubi-source-test.csv
-- source table
create temporary view s using csv options (path 'file:/tmp/spark-kyuubi-source-test.csv',header "true");
insert overwrite directory using csv options (path 'file:/tmp/spark-kyuubi-target') select * from s;
基础查询:
-- basic query
select timestamp '2023-03-03';
读取文件:
-- select from csv table
select * from s;
0: jdbc:hive2://localhost:10009/> select * from s;
+-----+-----------+
| id | name |
+-----+-----------+
| 1 | zhangsan |
| 2 | lisi |
+-----+-----------+
2 rows selected (0.072 seconds)
准备mysql数据
docker exec -it mysql bash
mysql -uroot -p'xxx'
mysql中建表建库
create database test default character set utf8mb4;
create user 'kyuubi'@'%' identified by 'Kyuubi123~';
grant all privileges on test.* to 'kyuubi'@'%' with grant option;
flush privileges;
use test;
create table t1 (id int primary key,name varchar(20));
insert into t1 values (1,'zhangsan');
进入beeline交互式命令行:
add jar '/home/xcchen/softwares/kyuubi/mysql-connector-java-8.0.22.jar';
create temporary view m_test
using org.apache.spark.sql.jdbc
options (url 'jdbc:mysql://localhost:3306/test',dbTable 't1',user 'kyuubi',password 'Kyuubi123~');
select * from m_test;
添加的jar包是租户级别的,退出命令行后,重新打开命令行建立jdbc连接的时候不需要再添加依赖jar包
启动hive metastore
hive --service metastore
配置方式:
bin/beeline -u 'jdbc:hive2://localhost:10009/;#hive.metastore.uris=thrift://localhost:9083' -n xcchen
$KYUUBI_HOME/conf/kyuubi-defaults.conf
,添加spark.hive.metastore.uris
配置项默认的spark元数据信息只保存在会话级别,当我们在第一个命令行窗口开启会话a并且创建一系列表后,新开一个窗口开启会话b时,使用show tables
会发现没有表存在。说明默认的spark元数据信息是会话级别的,如果使用了metastore那就另说
参考git项目地址:http://192.168.118.128:81/xcchen/kyuubi-jdbc-examples#
kyuubi的认证只是针对使用kyuubi本身,作用域仅仅是kyuubi这个组件!
kyuubi支持多种验证方式
下面我们以mysql来进行测试
拷贝mysql驱动到$KYUUBI_HOME/jars
目录
cp mysql-connector-java-8.0.22.jar $KYUUBI_HOME/jars
准备mysql表
create database test default character set utf8mb4;
create table kyuubi_user (`user` varchar(50) primary key, password varchar(50));
insert into kyuubi_user values ('xcchen',md5(concat('1qazZSE$','xcchen')));
配置kyuubi-defaults.conf
,设置jdbc认证参数:
kyuubi.authentication=JDBC
kyuubi.authentication.jdbc.driver.class = com.mysql.jdbc.Driver
kyuubi.authentication.jdbc.url = jdbc:mysql://localhost:3306/test
kyuubi.authentication.jdbc.user = root
kyuubi.authentication.jdbc.password = root
kyuubi.authentication.jdbc.query = SELECT 1 FROM kyuubi_user WHERE user=${user} AND password=MD5(CONCAT('1qazZSE$',${password}))
使用beeline连接时填写用户名和密码(其中-n
代表用户名,-p
代码密码):
bin/beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL' -n xcchen -p xcchen
如果用户名密码未填写正确,报错信息如下:
Connecting to jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL
Unknown HS2 problem when communicating with Thrift server.
Error: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL: Peer indicated failure: Error validating the login (state=08S01,code=0)
Beeline version 1.6.1-incubating by Apache Kyuubi (Incubating)
Tips:如果认证配置不生效,请仔细检查query语句和你的mysql表是否对应上了,md5的salt值是否使用正确.
flink任务没有这个概念,就算使用相同的用户,他们的环境也是隔离的
spark任务相同用户,他们的会话也是不一样的