底层是基于tcp协议
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# clickhouse-client -h clickhouse1 --port 9000 -u default --password default123 -m -n
ClickHouse client version 21.6.5.37 (official build).
Connecting to clickhouse1:9000 as user default.
Connected to ClickHouse server version 21.6.5 revision 54448.
clickhouse1 :)
clickhouse1 :) select 1;
SELECT 1
Query id: a58a5e62-75cb-4b3c-a8e8-5ed8d2917b85
┌─1─┐
│ 1 │
└───┘
1 rows in set. Elapsed: 0.016 sec.
clickhouse1 :)
clickhouse1 :) select 2;
SELECT 2
Query id: 35771efe-e306-403d-b39a-7a58abba4412
┌─2─┐
│ 2 │
└───┘
1 rows in set. Elapsed: 0.006 sec.
clickhouse1 :)
clickhouse1 :) select hostName();
SELECT hostName()
Query id: 7d08f34a-489e-4e05-91be-a2e72f4c08fa
┌─hostName()──┐
│ clickhouse1 │
└─────────────┘
1 rows in set. Elapsed: 0.010 sec.
clickhouse1 :)
执行的记录被保存clickhouse-client-history
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# pwd
/root
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# cat .clickhouse-client-history
......省略部分......
### 2021-07-09 04:37:28.414
select 1;
### 2021-07-09 04:39:48.674
select 2;
### 2021-07-09 04:39:51.683
select hostName();
导入数据
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# pwd
/root
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# cat insert.csv
3,san
4,si
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# clickhouse-client -h clickhouse1 --port 9000 -u default --password default123 -m -n --format_csv_delimiter="," --query "insert into distribute_test_all format CSV" < /root/insert.csv
[root@clickhouse1 ~]#
导出数据
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# clickhouse-client -h clickhouse1 --port 9000 -u default --password default123 -m -n --query "select * from distribute_test_all" > /root/select.csv
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# pwd
/root
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# cat select.csv
1 一
4 si
1 yi
2 er
3 san
[root@clickhouse1 ~]#
底层基于HTTP接口,Java版本如下:
pom.xml
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch8</version>
</dependency>
Clickhouse_JDBC_test.java
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class Clickhouse_JDBC_test {
public static void main(String[] args) throws Exception {
String url = "jdbc:clickhouse://192.168.23.41:8123,192.168.23.42:8123,192.168.23.43:8123,192.168.23.44:8123/default";
ClickHouseProperties clickHouseProperties = new ClickHouseProperties();
clickHouseProperties.setUser("default");
clickHouseProperties.setPassword("default123");
BalancedClickhouseDataSource balanced = new BalancedClickhouseDataSource(url, clickHouseProperties);
//对每个host进行ping操作, 排除不可用的连接
balanced.actualize();
Connection conn = balanced.getConnection();
String insertSql = "insert into test.java_table_test_all(id, name) values(?, ?)";
PreparedStatement pst = conn.prepareStatement(insertSql);
for (int i = 1; i < 9999; i++) {
pst.setInt(1, i);
pst.setString(2, "name" + i);
pst.addBatch();
if (i % 1000 == 0) {
// 执行conn.setAutoCommit(false);会报Transactions are not supported异常
// 所以不能执行conn.commit();
// 只能执行pst.executeBatch();由clickhouse进行后台提交。测试时插入数据条数是正确的
pst.executeBatch();
pst.clearBatch();
}
}
// 最后一批数据插入
pst.executeBatch();
pst.clearBatch();
pst.close();
conn.close();
}
}
添加驱动jar包
pySpark示例程序如下:
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
conf = SparkConf() \
.setAppName("clickhouseTest")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = spark.read.format("jdbc") \
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
.option("url","jdbc:clickhouse://bigdata001:8123,bigdata002:8123,bigdata003:8123/clickhouse_test_db") \
.option("user","default") \
.option("password","default123") \
.option("dbtable","(select user_id, name, age from user_tb where age between 10 and 30) tempTable") \
.option("partitionColumn", "age") \
.option("fetchsize", 25000) \
.option("lowerBound", 10) \
.option("upperBound", 30) \
.option("numPartitions", 15) \
.option("queryTimeout", 600) \
.load()
df.show(n=3, truncate=False)
spark.stop()