pig spark
作为过去主要使用Apache Pig的数据分析师,我最终需要编写更具挑战性的工作,这些工作需要使用Apache Spark(一种更高级,更灵活的语言)。 起初,Spark可能看起来有点吓人,但此博客文章将显示向Spark(尤其是PySpark)的过渡非常容易。
但是,我并不主张您在所有情况下都从Apache Pig迁移到Spark。 猪是一种很棒的语言。 通过投影和聚合来转换数据时,它既简单又高效,并且对于标准的Map / Reduce作业,Pig的生产率无与伦比。
Apache Pig具有强大的功能,但是…
我喜欢将Pig视为高级Map / Reduce命令管道。 作为一名前SQL程序员,我觉得它很直观,在我的组织中,我们的Hadoop工作仍然大部分是在Pig中开发的。
猪具有许多特质:它稳定,缩放良好,并与Hive metastore HCatalog本地集成。 通过对每个步骤进行原子描述,可以最大程度地减少您在复杂SQL代码中经常发现的概念性错误。
但是有时候,Pig有一些局限性,使其成为满足您需求的不良编程范例。 三个主要限制是:
Pig是管道,不提供循环或代码间接(IF..THEN),这些循环或代码间接有时在您的代码中是必需的。 正如Jai Ranganathan和Matei Zaharia在一篇文章中所说的那样:
虽然像Apache Pig这样的脚本框架也提供了许多高级运算符,但是Spark允许您在完整的编程语言的上下文中访问这些运算符-因此,您可以像在典型编程中那样使用控制语句,函数和类。环境。
最后,Pig的第三个局限性与输入数据格式有关:Pig可以使用CSV和HCatalog,但对于读取和处理其他数据格式(如JSON)(通过JsonLoader)似乎不太满意,而Spark则将其本地集成。
尝试一下Apache Spark
是时候在Spark上畅玩了! Pig和Spark共享一个通用的编程模型,可以轻松地从一个模型移到另一个模型。 基本上,您要完成由别名(Pig)或RDD变量(Spark)标识的不可变转换。 转换通常是投影(映射),过滤器或聚合,例如GroupBy,排序等。
这种常见的编程方法意味着,对于Pig开发人员而言,Spark的学习曲线相当快。
对于已经具备一些Python基本技能的数据分析师来说,PySpark是一个很自然的选择,但是代码在另一种Spark风格(例如Java或Scala)中将是相似的。
一个完整的例子
作为说明,让我们以Pig脚本为例,该脚本加载一个日志文件,在特定的一天对其进行过滤,计算按项目分组的日志条目数,并添加另一个文件中的项目描述:
/* load a log file of user sessions. Filter for a specific date and count entries per item
*/
f0 = LOAD 'logfile' using PigStorage('\t') AS (log_date:chararray, item_id:chararray, some_stuff:chararray);
f1 = FILTER f0 BY log_date == '20160515';
f2 = FOREACH f1 GENERATE item_id;
f3 = GROUP f2 BY item_id;
f4 = FOREACH f3 GENERATE group AS item_id, COUNT(f2) AS nb_entries;
/* add item name
*/
item1 = LOAD 'item' using PigStorage('\t') AS (item_id:chararray, item_name:chararray);
join1 = JOIN f4 BY item_id LEFT, item1 BY item_id;
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
STORE result INTO 'result_file' USING PigStorage('\t');
代码非常简单,每个步骤执行一次转换。
现在在Spark中,我们从使用低级RDD的原始Spark开始,以显示与Pig代码的相似之处。 在代码中,一次只详细描述了一个别名,但是显然生产代码会更紧凑。
原始Spark(使用RDD)
conf = SparkConf()
sc = SparkContext(conf=conf)
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
f1 = f0.filter(lambda x: x[0] == '20160515')
f3 = f1.groupBy(lambda (log_date, item_id, some_stuff): item_id)
f4 = f3.map (lambda (item_id, iterable): (item_id, len(iterable)))
# add item name
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
# no need to set the key item_id on both parts before performing the join,
# It's already on first place on each part.
join1 = f4.leftOuterJoin(item1)
result = join1.map(lambda (item_id, (nb_entries, item_name)): (item_id, item_name, str(nb_entries)))
# creating a line of tab separated fields, and save it in the result file
result_to_store = result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')
我们可以在此处看到Pig和Spark之间的类似代码轮廓,这使Pig开发人员更容易开始使用Spark进行编码。 但是,一个缺点是,对于像这样的相对简单的操作,即使Spark的执行时间更好(但并不惊人地更好),Pig仍然比Spark生产率更高。
现在我们已经熟悉了此低级RDD,可以通过使用DataFrames和SparkSQL来改进代码。 先前的代码可以以更易读的形式重写:
使用DataFrames和SparkSQL进行Spark
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
fpFields = [ \
StructField('log_date', StringType(), True), \
StructField('item_id', StringType(), True), \
StructField('some_stuff', StringType(), True) \
]
fpSchema = StructType(fpFields)
df_f0 = sqlContext.createDataFrame(f0, fpSchema)
df_f0.registerTempTable('log')
f1_df = sqlContext.sql(
"SELECT log.item_id, count(*) AS nb_entries \
FROM log \
WHERE log_date = '20160515'\
GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')
# items dataframe
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
itemFields = [ \
StructField('item_id', StringType(), True), \
StructField('item_name', StringType(), True) \
]
itemSchema = StructType(itemFields)
df_item1 = sqlContext.createDataFrame(item1, itemSchema)
df_item1.registerTempTable('item')
result = sqlContext.sql(
'SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
FROM log_agg \
LEFT OUTER JOIN item ON log_agg.item_id = item.item_id'
)
result_to_store = result.rdd \
.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')
我敢肯定,在Spark SQL中有更紧凑,更优雅的方法可以做到这一点,但这只是概述。
现在,我们已经命名了字段,类型安全性和紧凑SQL代码,这些代码对于数据分析人员来说更具可读性。 生产率提高了,这是猪的更好替代品。
缺点是每个SQL现在都是一个黑匣子,只能作为一个整体进行测试,如果结果与预期的不同或执行时间很慢,这可能会很棘手。 然后,由开发人员来设计仍然可读且可以作为单独的代码单元执行的步骤。
从Hive Metastore HCatalog加载数据
如果将我们的数据存储在Hive HCatalog中,则所有DataFrame元数据都将从metastore继承,而Spark代码将更加简单:
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
f1_df = sqlContext.sql(
"SELECT item_id, count(*) AS nb_entries \
FROM my_db.log \
WHERE log_date = '20160515' \
GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')
result = sqlContext.sql(
"SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
FROM log_agg \
LEFT OUTER JOIN my_db.item item ON log_agg.item_id = item.item_id"
)
result_to_store = result.rdd \
.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile(outputFileName)
现在,这是一段更加紧凑和可读的代码:)
现在,让我们进一步扩展使用Spark的优势:用户定义的函数。
用户定义的功能
如前所述,在Spark中,显然不需要UDF。 您只需将函数编写为Python方法:
在猪中:
/* the function below has been written and deployed in a jar file */
DEFINE myFancyUdf com.mydomain.myfunction1;
...
log1 = FOREACH log0 GENERATE field1, myFancyUdf (field1l);
在Spark中:
def myFancyUdf(f1):
someStuff
return result
log1 = log0.map (lambda field1: (field1, myFancyUdf(field1))
更高级的主题
在本节中,让我们通过两个示例来了解Pig在Spark中的更强大功能:
地图侧联接
Pig的一个方便功能是地图侧连接,其中要连接的表之一很小,足以发送给每个工作人员以参与Map作业(不需要更昂贵的Reduce作业)。 通过使用JOIN上的“ replicated”提示,可以方便地执行此操作。
想象一下,在我们之前的示例中,“ item”表足够小以适合内存。 join1别名变为:
join1 = JOIN f4 BY item_id, item1 BY item_id USING ‘replicated;
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
在Spark中,使用广播变量非常容易执行此操作:
# broadcast items
item_bc = sc.broadcast(item.collect())
'''
gets item name from its id
'''
def getItemName (item_id_to_match): # we know there will be only one result, so we take the first from the list
(id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
项目表在每个工作程序节点上广播。 然后,getItemName()函数在广播的表中查找包含给定item_id的记录,并返回其名称。 对于每个处理的记录,在Spark作业的地图端调用此函数。
现在,完整的代码如下所示:
'''
gets item name from its id
'''
def getItemName (item_id_to_match):
# we know there will be only one result, so we take the first from the
(id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
return name
f1_df = sqlContext.sql(
"SELECT item_id, count(*) AS nb_entries \
FROM my_db.log \
WHERE log_date = '20160515' \
GROUP BY item_id"
)
item_df = sqlContext.sql(
"SELECT item_id, item_name \
FROM my_db.item"
)
item_bc = sc.broadcast(item_df.rdd.collect())
result = f1_df.rdd.map (lambda= result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')
窗口函数:获取n个首次出现的分组项排序列表
有时需要查找表的前n个第一记录(按通用功能分组)。 从示例的日志文件中,让我们为每一项获取10条最近的记录(在SQL中,这将是类似于PARTITION BY的窗口函数)。
在Pig中,这可以通过如下代码来完成:
f0 = LOAD ‘logfile’ using PigStorage('\t') AS (log_date:char array, item_id:chararray, some_stuff:chararray);
f1 = GROUP f0 BY item_id;
f2 = FOREACH f1 {
o = ORDER f0 BY log_date DESC;
l = LIMIT o 10;
GENERATE FLATTEN(l) AS (log_date, item_id, some_stuff);
}
在Spark中,使用低级RDD或SparkSQL Windowing功能也是可行的。
让我们从RDD低级解决方案开始:
# create a tuple with the key for the GroupBy
f1 = f0.map (lambda (log_date, item_id, some_stuff): (item_id, (log_date, some_stuff)))
f2 = f1.groupByKey()
# result of the GroupBy is a tuple (item_id, iterable over grouped items)
# we sort the iterable according to log_date and retain only first 10 elements
f3 = f2.map (lambda (item_id, iter1): (item_id, sorted(list(iter1), key=lambda (log_date, item_id, some_stuff):log_date, reverse=True)[:10]))
# transform tuples of (item_id, [(log_date, item_id, some_stuff), ...]) into tuples of (log_date, item_id, some_stuff)
f4 = f3.flatMapValues(lambda x:x) \
.map (lambda (item_id, (log_date, some_stuff)):(log_date, item_id, some_stuff)
它不是很优雅,但是可以做到。
然后是SparkSQL解决方案:
f1_df = sqlContext.sql(
'SELECT \
log_date, \
item_id, \
some_stuff \
FROM ( \
SELECT \
log_date, \
item_id, \
some_stuff, \
dense_rank() OVER (PARTITION BY item_id ORDER BY log_date DESC) as rank \
FROM my_db.log) tmp \
WHERE rank <= 10')
f2 = f1_df.rdd.map (lambda row: (row.log_date, row.item_id, row.some_stuff))
好多了!
结论
我自愿从此博客文章中排除了一些有趣的主题,例如部署,调试,执行监视,动态资源分配,分区和拆分大小调整,采样等。此特定博客文章的目的是向Pig开发人员展示如何开始编码在Spark中; 我希望从这个角度来看,您会找到帮助。 如果您还有其他问题,请在下面的评论部分中提问。
翻译自: https://www.javacodegeeks.com/2016/07/pig-spark-easy-journey-spark-apache-pig-developers.html
pig spark