Phoenix是构建在HBase上的一个SQL层,专为HBase研发。能让我们用标准的JDBC API而不是HBase客户端APIs来创建表,插入数据和对HBase数据进行查询。
Phoenix完全使用Java编写,作为HBase内嵌的JDBC驱动。
Download:http://phoenix.apache.org/download.html,下载hbase对应版本的phoenix;
解压bin.tar.gz包,拷贝phoenix server jar包到hbase集群的每个region server 的lib目录下,然后重启hbase 集群。
phoenix支持的数据类型
charType
varcharType
decimalType
tinyintType
smallintType
integerType
bigintType
floatType
doubleType
timestampType
dateType
timeType
binaryType
varbinaryType
phoenix语法使用:phoenix官网语法
phoenix默认都是大写。除非用上双引号进行区分大小写,注意shell和API的操作!
帮助文档:help
select查询
UPSERT INTO TEST VALUES('foo','bar',3);
UPSERT INTO TEST(ID, MY_COL) VALUES(123, 0) ON DUPLICATE KEY IGNORE;
DUPLICATE KEY:会保证执行结果的原子性
IGNORE: 希望UPSERT在行已存在的情况下不执行插入数据使用该关键字。
所有表、列族和列名都是大写的,除非它们是双引号表示字符串,在这种情况下它们区分大小写。
CREATE TABLE if not exists INFO(
"event_time.ss_transducer_code" varchar not null primary key,
"info"."time" varchar,
"info"."field" varchar
)COMPRESSION=SNAPPY,SALT_BUCKETS=9;
创建压缩格式为SNAPPY,分区数为9的表。COMPRESSION和SALT_BUCKETS都是可选的。
创建视图一般用于表不会再被更改和删除时
删除视图:DROP VIEW viewname ;删除视图不会对表数据有影响。
在表或视图上创建新的二级索引。随着数据的变化,索引将自动与表保持同步。在查询时,如果索引包含查询中引用的所有列并生成最有效的执行计划,则优化器将使用该索引。为了提升查询性能
在表、架构或用户级别授予权限, R - 读取、W - 写入、X - 执行、C - 创建和 A - 管理员
支持动态加载,比如写入表不存在的字段。
支持批量加载文件到表中
1:批量加载文件:可以使用psql.py批量加载csv文件数据
psql.py [zookeeper] …/examples/web_stat.csv
12345,John,Doe
67890,Mary,Poppins
CREATE TABLE example (
my_pk bigint not null,
m.first_name varchar(50),
m.last_name varchar(50)
CONSTRAINT pk PRIMARY KEY (my_pk))
执行加载:bin/psql.py -t EXAMPLE localhost data.csv
2:执行sql文件:psql.py [zookeeper] …/examples/web_stat.sql
函数详解
字段用双引号,值用单引号
AVG(X) :对某列求平均值
COUNT(*) :计数
max/min :最大最小值
获取当前时间:now()、current_time、current_date
时间差计算:
TO_NUMBER:支持对字符串和时间类型转为数字。对于日期、时间和时间戳项,结果是自纪元以来的时间(以毫秒为单位
TRUNC:截取时间中的一部分。如:TRUNC(timestamp, 'SECOND', 30) TRUNC(date, 'DAY', 7)
使用!record命令实现
!record fileName #开启一个导出到系统中fileName 的记录器。比如/tmp/test.csv
select * from table; #将执行的结果写入到fileName
!record #关闭记录器后即可到系统的fileName 中查看数据
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-client-hbase-[hbase.profile]</artifactId>
<version>[phoenix.version]</version>
</dependency>
使用 JDBC 获取到 HBase 集群的连接,如下所示:
其中props是可选属性,可能包括 Phoenix 和 HBase 配置属性。
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection conn = DriverManager.getConnection("URL",props);
url为设置的连接地址
props示例
Configuration conf = HBaseConfiguration.create();
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("hbase-site.xml");
Properties props = new Properties();
props.setProperty("phoenix.query.timeoutMs", "1200000");
props.setProperty("phoenix.connection.autoCommit", "true");
props.setProperty("hbase.rpc.timeout", "1200000");
props.setProperty("hbase.client.scanner.timeout.period", "1200000");
props.put("key", conf.get("key")); //可以获取hbase的一些属性进行设置
数据写入hbase实例
@ map<fieldName ,value>
public void write2hbase(HashMap<String, String> map) {
conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());
Set<String> stringSet = map.keySet();
StringBuilder stringBuilder = new StringBuilder();
for (String field : stringSet) {
stringBuilder.append("\"")
.append(field)
// 为了解决phoenix大小写问题
.append("\"")
.append(",");
}
String keys = stringBuilder.deleteCharAt(stringBuilder.length() - 1).toString();
String sql = "upsert into INVALID_DATA_INFO (" + keys + ") values(" + keys.replaceAll("[^,]+", "?") + ")";
try {
PreparedStatement ps = conn.prepareStatement(sql);
String[] split = keys.split(",");
for (int i = 0; i < split.length; i++) {
String name = split[i];
ps.setString(i + 1, map.get(name.replaceAll("\"","")));
}
ps.execute();
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (!Objects.isNull(conn))
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
删除数据
public void deleteData(String sql) {
try (Connection conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.executeUpdate();
} catch (SQLException ignored) {
ignored.printStackTrace();
}
}
查询数据:返回List
public List<JSONObject> queryList(String sql) {
List<JSONObject> result = new ArrayList<>();
conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());
try {
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
HashMap<String, String> map = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnLabel(i);
String value = resultSet.getString(columnName);
if (null == value) {
value = "";
}
map.put(columnName, value);
}
String s = JSON.toJSONString(map);
JSONObject jsonObject = JSONObject.parseObject(s);
result.add(jsonObject);
}
} catch (Exception e) {
throw new PhoenixException("Phoenix query table fail .", e);
}
return result;
}