Spark与Hive可以相互结合,同时Spark也可以使用DataFrame读取HBase里的数据,Hive也同样可以直接读取HBase的数据。只不过在Spark和Hive读取HBase数据的时候,需要做列簇或列映射,对于列不确定的需要列簇映射。 几种数据读取和分析思路
- Hive on HBase做好表映射,然后使用Tez替换MR引擎,使用Hive做数据分析
- 这是最基本的一种方式,对于上层的数据分析可以提供基础支持,同时也可以通过写脚本的方式来做数据分析。但是跟别的数据源关联和分析比较麻烦,不像Spark可以提供多种数据源连接。
- Spark SQL链接HBase,使用DataFrame做数据分析(由Spark构建表映射)
- HuaweiBigData/astro,hortonworks-spark/shc,这是两个连接映射器,不过我目前没有去探索过。
- Hive on HBase,开启Hive的thrift server,通过JDBC连接提取数据
- 这种方式与普通的JDBC取关系型数据库一样,可在SQL语句里传入Hive相应的处理函数,利用Hive来做计算然后提取结果。如果是Spark取数据的话,还不需要Spark做HBase表列的映射,很多函数可以使用Hive本身的。
- 但是,在目前Hive 1.2的JDBC版本里面,使用它会报错:
java.sql.SQLException: Method not supported at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned
,这是因为在目前的JDBC版本里,甚至以后的Hive 2.0版本里,isSigned
这个方法都没有做实现,并且在Spark 1.5及以上版本
里,这个方法被Spark SQL的resolveTable
所调用,所以在这些版本的Spark里,这种方式都无法使用,低版本的Spark或许可以。
- Hive on HBase,Spark SQL使用HiveContext接触Hive的元数据信息,然后自己做数据提取和分析
- HBase表列的映射不需要做,可以在Hive映射的基础上提取数据。
- 需要配置Spark识别Hive所映射的HBase表(默认无法识别)。
- 首先配置Spark在提交Job的时候加上以下jar包(博主使用的HDP2.4.0.0-169): hive-hbase-handler-1.2.1000.2.4.0.0-169.jar hbase-client-1.1.2.2.4.0.0-169.jar hbase-protocol-1.1.2.2.4.0.0-169.jar hbase-common-1.1.2.2.4.0.0-169.jar hbase-server-1.1.2.2.4.0.0-169.jar htrace-core-3.1.0-incubating.jar guava-12.0.1.jar protobuf-java-2.5.0.jar 上述的jar包分别在Hive和HBase的lib目录下找。具体配置如下:在
spark-defaults.conf
的配置文件里加入spark.executor.extraClassPath
和spark.driver.extraClassPath
,其值是上述jar包的路径,多个jar包之间以:
分割。如:spark.executor.extraClassPath /usr/hdp/2.4.0.0-169/spark/extrajars/hbase-client-1.1.2.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/spark/extrajars/hbase-protocol-1.1.2.2.4.0.0-169.jar
,在Ambari里配置就更简单了,在Custom spark-defaults里添加这两条配置即可。 - 其次,需将
hbase-site.xml
文件拷贝至HADOOP_CONF_DIR
文件夹(/etc/hadoop/conf
)下,因为在spark-env.sh
里配置了HADOOP_CONF_DIR
,里面的文件会被默认加载,所以hbase的配置也会被加载。其中有用的配置是以下几个: hbase.zookeeper.quorum zookeeper.znode.parent hbase.client.scanner.caching 其中caching这个值默认是100,为了性能可以适当提高,比如5000
- 首先配置Spark在提交Job的时候加上以下jar包(博主使用的HDP2.4.0.0-169): hive-hbase-handler-1.2.1000.2.4.0.0-169.jar hbase-client-1.1.2.2.4.0.0-169.jar hbase-protocol-1.1.2.2.4.0.0-169.jar hbase-common-1.1.2.2.4.0.0-169.jar hbase-server-1.1.2.2.4.0.0-169.jar htrace-core-3.1.0-incubating.jar guava-12.0.1.jar protobuf-java-2.5.0.jar 上述的jar包分别在Hive和HBase的lib目录下找。具体配置如下:在
- Hive的各种函数和map, struct等数据结构在Spark里都可以找到对应的,处理数据更加简单。如下是取hive表里的
struct
类型字段,字段原型rowkey_struct struct<hash:string, resid:string, time_stamp:string>
对于有些列名是动态变化的,比如存放外键关联字段的时候,并不知道列名和有多少个列。必须通过hive的map映射,映射的字段怎么处理呢?需要把map里的key-value键值对转换为行,然后将行转列,需要用到val sqlContext = new HiveContext(sc) sqlContext.table("cbd.summary").select("rowkey_struct", "Amount"). select($"rowkey_struct".getField("hash").as("hash"),$"rowkey_struct".getField("time_stamp").as("time_stamp").cast(LongType), $"Amount".cast(LongType))
explode
函数。如下是处理map
类型字段,字段原型project_info map<String, string>
case classFilterField(EstiType: String, ProjectType: String) val sqlContext = new HiveContext(sc) sqlContext.table("cbd.project").select("rowkey","project_info").explode($"project_info") { case Row(project_info: Map[String, String]) => val projectType = project_info.filterKeys(_.contains("工程类型")).map(_._2).headOption.getOrElse("NaN") // 不知列名的采用匹配,已知列名的可以直接取 Array(FilterField(project_info.getOrElse("计价方式", "NaN"), projectType)) }.select("rowkey", "EstiType", "ProjectType")
- 同样的语句,执行速度比Hive on Tez快
- 在Ambari里,Spark默认已经配置好可以读取Hive的元数据信息
- Hive on HBase,由Hive做好分析后,把数据写入到Hive其他的表里,然后Spark再从Hive表里取数据。 这种就比较绕,除非Hive里有的函数很重要但是Spark里不具备,可以这么干,否则直接使用Spark读取Hive的元数据信息更合适。
欢迎转载,但请注明出处:https://my.oschina.net/u/2539801/blog/750858