Spark是一个通用的快速的大数据处理引擎,是类似于hadoop的map reduce大数据并行处理引擎。它的数据源可以是hdfs、cassandra、hbase等,除常规编程模式外,它还是支持sql使用方式。Spark支持streaming流式计算(秒级延迟)、机器学习库MLib、图计算GraphX、Bagel(Google的pregel图计算框架的实现)、SparkR等多种库,以用于各种复杂的数据处理的场景。
基于spark的编程框架,编写简洁的数据处理脚本,通过spark shell等方式将任务提交到spark平台,spark即可完成大数据任务拆分以及处理,用户可以通过管理的页面来查看任务的处理状态。
Spark基于scala编写,目前spark框架API接口支持scala、java、python、R等语言。
Spark 于2012年推出,相对hadoop的map reduce框架,具备较多优点。
优点具体如下:
1) 计算速度快,官方宣称:相对于hadoop,存储基于内存时,快100倍以上,数据存储基于磁盘时快10倍以上。
2) 编程简单
做迭代计算时,不需要像hadoop反复的写多个map reduce,更多和单机的过程式编程类似,代码简单很多。
提供了map(映射处理)、filter、count、reduce、join、group by等80种以上的计算算子,直接使用即可。
简单的已有算子支持的多轮迭代计算任务,一个脚本几行代码就搞定,相对于hadoop的多个map reduce类要简洁很多。
3) 可以在HDfs、hbase、cassandra、kafaka等多种数据源上运行。
1) SparkContext
Spark程序首先就要构建一个SparkContext,它告知程序如何访问spark集群。 SparkContext可以基于sparkConf这个包含了集群等配置信息的对象构建。默认从/conf配置文件夹中的配置文件中读取相关配置,如spark平台的地址、hdfs地址等。
Python api初始化样例如下
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
2) RDD(Resilient DistributedDatasets)
弹性分布式数据集,这个是spark这核心的抽象,它是spark的迭代计算过程中操作的分布式数据对象,是其主要的数据形态。RDD作为数据结构,本质上是一个只读的分区记录集合,一个RDD可以包含多个分区,每个分区就是一个dataset片段,RDD可以相互依赖。RDD是自容错的,如丢失,会自动重新计算生成。rdd在spark中可以有多重存储级别,可以默认纯cach,也可以在磁盘上。可以选择多副本模式,用于为在线服务提供即时查询的场景。
RDD可以由已有的数据集合生成,也可以由hdfs等外部数据文件或库生成。以下为从hdfs文件生成的Python代码样例:
distFile = sc.textFile("/directory/data.txt") //输入可以为目录或者模糊匹配的目录或文件 /*/*.txt
对于rdd支持的各类操作(过滤、映射、reduce等),多看api doc,一个页面都包括了,很简洁易懂。
提供python语言的代码demo。
1、 开始写代码前,做好环境准备
可以自己搭建部署spark,也可以利用已有环境进行配置。
包括:
1) conf/spark-defaults.conf
配置spark.master、spark.ui.port等信息
2) 结合hdfs使用,配置hdfs的信息
可以将正确配置的hadoop-site.xml放到spark的conf目录下,主要包括hdfs地址以及hadoop的用户名密码等信息。
2、 示例代码
基于python构建,统计文件中词的数量map、reduce等计算
from pyspark import SparkContext, SparkConf
#give your own hdfs path
srcPath="/xx/xx"
resPath="/xx/xx"
appName="word count test"
#init sparkContext
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
#create rdd from hadoop txt file
textRdd = sc.textFile(srcPath)
#map text word line to (word,1)
wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1))
#reducebyKey to get word count
wordCounts = wordSplit.reduceByKey(lambda a, b: a+b)
print wordCounts.collect()
#save to hdfs
wordCounts.saveAsTextFile(resPath)
3、 提交任务到spark平台
通过以下指令提交任务
./bin/spark-submit xx.py
4、 查看任务执行状态
到spark ui web界面上根据Appname 查看任务执行状态以及运行细节。
Web ui地址为:spark.master:spark.ui.port。
Spark支持以sql的方式来查询处理大数据,除了自己构建的spark table外,也支持访问hive的table。注意其支持嵌套sql。
1、 SqlContext
Sql的上下文对象,基于SparkContext构建SqlContext
sqlContext = SQLContext(sc)
2、 DataFrames
DataFrames是一个以表列组织的分布式数据集,类似于关系数据库中表。可以从已有RDD、hive table等多个方式构建。
以下为从已有RDD构建DataFrames的代码样例:
#user is rdd created by row element, create DataFrame from user RDD
schemaUser = sqlContext.inferSchema(user) //spark 1.3 后是接口createDataFrame()
schemaUser.registerTempTable("user")
继续以上面的文件的词统计为例,通过sql获取出现次数top 10的词
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
#define your hdfs data path
srcPath="/xxx/xxx "
resPath="/xxx/xxx "
appName="word count test"
#init sqlContext
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#create rdd from hadoop txt file
textRdd = sc.textFile(srcPath)
#map text word line to (word,1)
wordSplit = textRdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1))
#reducebyKey to get word count
wordCounts = wordSplit.reduceByKey(lambda a, b: a+b)
#create row based rdd
rowRdd = wordCounts.map(lambda x: Row(word=x[0],wc=x[1]))
wordFrames = sqlContext.inferSchema(rowRdd)
wordFrames.registerTempTable("tword")
top10Frames = sqlContext.sql("select word,wc FROM tword order by wc desc limit 10")
print top10Frames.collect()
作者信息: 汪恭正,2010年加入百度,现任百度资深研发工程师
Github地址: https://github.com/neowgz
联系方式: wgongzheng@163.com