当前位置: 首页 > 工具软件 > Phoneix > 使用案例 >

使用phoneix连接hbase

杭英杰
2023-12-01

phoenix官网
phoenix官网函数语法

1.1:简介

Phoenix是构建在HBase上的一个SQL层,专为HBase研发。能让我们用标准的JDBC API而不是HBase客户端APIs来创建表,插入数据和对HBase数据进行查询。
Phoenix完全使用Java编写,作为HBase内嵌的JDBC驱动。

1:下载安装

Download:http://phoenix.apache.org/download.html,下载hbase对应版本的phoenix;
解压bin.tar.gz包,拷贝phoenix server jar包到hbase集群的每个region server 的lib目录下,然后重启hbase 集群。

2:支持的数据类型

phoenix支持的数据类型

charType	
varcharType	
decimalType	
tinyintType	
smallintType	
integerType	
bigintType	
floatType	
doubleType	
timestampType	
dateType	
timeType	
binaryType	
varbinaryType	

3:shell客户端使用

phoenix语法使用:phoenix官网语法
phoenix默认都是大写。除非用上双引号进行区分大小写,注意shell和API的操作!

  • 进入客户端:sqlline.py [zookeeper quorum hosts]
  • 内置脚本
    可以使用该脚本实现不进入客户端执行sql文件或者批量加载文件
    1:批量加载文件:可以使用psql.py批量加载csv文件数据
    psql.py [zookeeper] …/examples/web_stat.csv
    2:执行sql文件:psql.py [zookeeper] …/examples/web_stat.sql

帮助文档:help

3.1:DML数据操作

select查询

  • 1:select查询语法
    select * from table order by filed limti 10;
    order by 根据查询结果进行排序后再取limit个数
    upsert插入
  • 1:upsert into插入
    如果不存在则插入,否则更新表中的值。
UPSERT INTO TEST VALUES('foo','bar',3);
UPSERT INTO TEST(ID, MY_COL) VALUES(123, 0) ON DUPLICATE KEY IGNORE;

DUPLICATE KEY:会保证执行结果的原子性
IGNORE: 希望UPSERT在行已存在的情况下不执行插入数据使用该关键字。

  • 2:UPSERT 根据查询结果插入表,可以指定列
    UPSERT INTO foo SELECT * FROM bar;
    delete删除数据
    DELETE FROM TEST WHERE NAME LIKE ‘foo%’;
    删除where子句查询的数据。

3.2:表DDL操作

所有表、列族和列名都是大写的,除非它们是双引号表示字符串,在这种情况下它们区分大小写。

  • 1:创建表
CREATE TABLE if not exists INFO(
"event_time.ss_transducer_code" varchar not null primary key,
"info"."time" varchar,
"info"."field" varchar
)COMPRESSION=SNAPPY,SALT_BUCKETS=9;
创建压缩格式为SNAPPY,分区数为9的表。COMPRESSION和SALT_BUCKETS都是可选的。
  • 2:删除表
    DROP TABLE IF EXISTS my_table;
    DROP TABLE my_schema.my_table CASCADE;
    使用可选的CASCADE关键字也会导致表上的任何视图也被删除。
  • 3:删除增加列ALTER
    使用ALTER +add/drop关键字对表的列进行增加或删除
    增加列:ALTER TABLE my_table ADD ID char(10) VERSIONS=10;
    删除列:ALTER TABLE my_table DROP COLUMN ID;

3.3:view视图操作

创建视图一般用于表不会再被更改和删除时
删除视图:DROP VIEW viewname ;删除视图不会对表数据有影响。

3.4:索引index

在表或视图上创建新的二级索引。随着数据的变化,索引将自动与表保持同步。在查询时,如果索引包含查询中引用的所有列并生成最有效的执行计划,则优化器将使用该索引。为了提升查询性能

  • 1:创建index索引
    create index INDEX_NAME on TABLE_NAME(“info”.“name”,“info”.“age”)
  • 2:删除索引
    DROP INDEX IF EXISTS INDEX_NAME ON TABLE_NAME;
    查询时只有需要查询的字段为建立索引的字段时才会走二级索引查询。
    比如select a,b,c但是索引字段只有a就不会走。

3.5:表以及视图等权限设置

在表、架构或用户级别授予权限, R - 读取、W - 写入、X - 执行、C - 创建和 A - 管理员

4:动态加载

支持动态加载,比如写入表不存在的字段。

5:批量加载

支持批量加载文件到表中
1:批量加载文件:可以使用psql.py批量加载csv文件数据
psql.py [zookeeper] …/examples/web_stat.csv

12345,John,Doe
67890,Mary,Poppins
 CREATE TABLE example (
    my_pk bigint not null,
    m.first_name varchar(50),
    m.last_name varchar(50) 
    CONSTRAINT pk PRIMARY KEY (my_pk))

执行加载:bin/psql.py -t EXAMPLE localhost data.csv

2:执行sql文件:psql.py [zookeeper] …/examples/web_stat.sql

6:select详解

6.1:查询函数

函数详解
字段用双引号,值用单引号

AVG(X)  :对某列求平均值
COUNT(*) :计数
max/min :最大最小值
获取当前时间:now()、current_time、current_date
时间差计算:
TO_NUMBER:支持对字符串和时间类型转为数字。对于日期、时间和时间戳项,结果是自纪元以来的时间(以毫秒为单位
TRUNC:截取时间中的一部分。如:TRUNC(timestamp, 'SECOND', 30) TRUNC(date, 'DAY', 7)



1.2:将phoneix数据导出为csv文件

使用!record命令实现

!record fileName   #开启一个导出到系统中fileName  的记录器。比如/tmp/test.csv
select * from table;  #将执行的结果写入到fileName  
!record      #关闭记录器后即可到系统的fileName  中查看数据

1.3:api开发

1:maven依赖

<dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-client-hbase-[hbase.profile]</artifactId>
        <version>[phoenix.version]</version>
</dependency>

使用 JDBC 获取到 HBase 集群的连接,如下所示:
其中props是可选属性,可能包括 Phoenix 和 HBase 配置属性。

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection conn = DriverManager.getConnection("URL",props);
url为设置的连接地址

props示例

 Configuration conf = HBaseConfiguration.create();
            conf.addResource("core-site.xml");
            conf.addResource("hdfs-site.xml");
            conf.addResource("hbase-site.xml");
            
  Properties props = new Properties();
   props.setProperty("phoenix.query.timeoutMs", "1200000");
   props.setProperty("phoenix.connection.autoCommit", "true");
   props.setProperty("hbase.rpc.timeout", "1200000");
   props.setProperty("hbase.client.scanner.timeout.period", "1200000");
   props.put("key", conf.get("key")); //可以获取hbase的一些属性进行设置

数据写入hbase实例

@ map<fieldName ,value>
public void write2hbase(HashMap<String, String> map) {
        conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());

        Set<String> stringSet = map.keySet();
        StringBuilder stringBuilder = new StringBuilder();
        for (String field : stringSet) {
            stringBuilder.append("\"")
                    .append(field)
                    // 为了解决phoenix大小写问题
                    .append("\"")
                    .append(",");
        }
        String keys = stringBuilder.deleteCharAt(stringBuilder.length() - 1).toString();

        String sql = "upsert into INVALID_DATA_INFO (" + keys + ") values(" + keys.replaceAll("[^,]+", "?") + ")";
        try {
            PreparedStatement ps = conn.prepareStatement(sql);
            String[] split = keys.split(",");
            for (int i = 0; i < split.length; i++) {
                String name = split[i];
                ps.setString(i + 1, map.get(name.replaceAll("\"","")));
            }
            ps.execute();
            conn.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                if (!Objects.isNull(conn))
                    conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

删除数据

 public void deleteData(String sql) {

        try (Connection conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());
             PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
            preparedStatement.executeUpdate();
        } catch (SQLException ignored) {
            ignored.printStackTrace();
        }
    }

查询数据:返回List

public List<JSONObject> queryList(String sql) {
        List<JSONObject> result = new ArrayList<>();
        conn = JdbcUtil.getPhoenixConnection(new UdfAuthentication().getConfiguration());
        try {
            PreparedStatement ps = conn.prepareStatement(sql);
            ResultSet resultSet = ps.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            while (resultSet.next()) {
                HashMap<String, String> map = new HashMap<>();
                for (int i = 1; i <= columnCount; i++) {
                    String columnName = metaData.getColumnLabel(i);
                    String value = resultSet.getString(columnName);
                    if (null == value) {
                        value = "";
                    }
                    map.put(columnName, value);
                }
                String s = JSON.toJSONString(map);
                JSONObject jsonObject = JSONObject.parseObject(s);
                result.add(jsonObject);
            }

        } catch (Exception e) {
            throw new PhoenixException("Phoenix query table fail .", e);
        }
        return result;
    }
 类似资料: