当前位置: 首页 > 工具软件 > Apache Pig > 使用案例 >

从Pig到Spark:Apache Pig开发人员的轻松旅程到Spark

诸福
2023-12-01

作为过去主要使用Apache Pig的数据分析师,我最终需要编写更具挑战性的工作,这些工作需要使用Apache Spark(一种更高级,更灵活的语言)。 起初,Spark可能看起来有点吓人,但此博客文章将显示向Spark(尤其是PySpark)的过渡非常容易。

但是,我并不主张您在所有情况下都从Apache Pig迁移到Spark。 猪是一种很棒的语言。 通过投影和聚合来转换数据时,它既简单又高效,并且对于标准的Map / Reduce作业,Pig的生产率无与伦比。

Apache Pig具有强大的功能,但是…

我喜欢将Pig视为高级Map / Reduce命令管道。 作为一名前SQL程序员,我觉得它非常直观,在我的组织中,我们的Hadoop工作仍然大部分是在Pig中开发的。

猪具有许多特质:它稳定,缩放良好,并与Hive metastore HCatalog本地集成。 通过对每个步骤进行原子描述,可以最大程度地减少您在复杂的SQL代码中经常发现的概念性错误。

但是有时候,Pig有一些局限性,使其无法满足您的需求。 三个主要限制是:

Pig是管道,不提供循环或代码间接(IF..THEN),这些循环或代码间接有时在您的代码中是必需的。 正如Jai Ranganathan和Matei Zaharia在一篇文章中所说的那样:


尽管像Apache Pig这样的脚本框架也提供了许多高级运算符,但Spark允许您在完整的编程语言的上下文中访问这些运算符-因此,您可以像在典型编程中那样使用控制语句,函数和类。环境。

最后,Pig的第三个局限性与输入数据格式有关:尽管Pig与CSV和HCatalog兼容,但在读取和处理其他数据格式(如JSON)(通过JsonLoader)方面似乎不太满意,而Spark则将其本地集成。

尝试一下Apache Spark

是时候在Spark上畅玩了! Pig和Spark共享一个通用的编程模型,可以轻松地从一个模型移到另一个模型。 基本上,您要完成由别名(Pig)或RDD变量(Spark)标识的不可变转换。 转换通常是投影(映射),过滤器或聚合,例如GroupBy,排序等。

这种常见的编程方法意味着,对于Pig开发人员而言,Spark的学习曲线相当快。

对于已经具备一些Python基本技能的数据分析师来说,PySpark是一个很自然的选择,但是代码在另一种Spark风格(例如Java或Scala)中将是相似的。

一个完整的例子

作为说明,让我们以Pig脚本为例,该脚本加载一个日志文件,在特定的一天对其进行过滤,计算按项目分组的日志条目数,并添加另一个文件中的项目描述:

/* load a log file of user sessions. Filter for a specific date and count entries per item
*/
 
f0 = LOAD 'logfile' using PigStorage('\t') AS (log_date:chararray, item_id:chararray, some_stuff:chararray);
 
f1 = FILTER f0 BY log_date == '20160515';
 
f2 = FOREACH f1 GENERATE item_id;
 
f3 = GROUP f2 BY item_id;
 
f4 = FOREACH f3 GENERATE group AS item_id, COUNT(f2) AS nb_entries;
 
/* add item name
*/
 
item1 = LOAD 'item' using PigStorage('\t') AS (item_id:chararray, item_name:chararray);
 
join1 = JOIN f4 BY item_id LEFT, item1 BY item_id;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
 
STORE result INTO 'result_file' USING PigStorage('\t');

代码非常简单,每个步骤执行一次转换。

现在在Spark中,我们从使用低级RDD的原始Spark开始,以显示与Pig代码的相似之处。 在代码中,一次只详细描述了一个别名,但是显然生产代码会更紧凑。

原始Spark(使用RDD)

conf = SparkConf()
sc = SparkContext(conf=conf)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
f1 = f0.filter(lambda x: x[0] == '20160515')
 
f3 = f1.groupBy(lambda (log_date, item_id, some_stuff): item_id)
f4 = f3.map (lambda (item_id, iterable): (item_id, len(iterable)))
 
# add item name
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
# no need to set the key item_id on both parts before performing the join,
# It's already on first place on each part.
 
join1 = f4.leftOuterJoin(item1)
 
result = join1.map(lambda (item_id, (nb_entries, item_name)): (item_id, item_name, str(nb_entries)))
 
# creating a line of tab separated fields, and save it in the result file
result_to_store = result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

我们可以在此处看到Pig和Spark之间的类似代码轮廓,这使Pig开发人员可以更轻松地开始在Spark中进行编码。 但是,一个缺点是,对于像这样的相对简单的操作,Pig仍然比Spark更有生产力,即使Spark的执行时间更好(但并不惊人地更好)。

现在我们已经熟悉了此低级RDD,可以通过使用DataFrames和SparkSQL来改进代码。 先前的代码可以用更易读的形式重写:

使用DataFrames和SparkSQL进行Spark

conf = SparkConf()
sc = SparkContext(conf=conf)
 
sqlContext = SQLContext(sc)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
fpFields = [ \
   StructField('log_date', StringType(), True), \
   StructField('item_id', StringType(), True), \
   StructField('some_stuff', StringType(), True) \
]
 
fpSchema = StructType(fpFields)
df_f0 = sqlContext.createDataFrame(f0, fpSchema)
df_f0.registerTempTable('log')
 
f1_df = sqlContext.sql(
   "SELECT log.item_id, count(*) AS nb_entries \
      FROM log \
     WHERE log_date = '20160515'\
  GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')
# items dataframe
 
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
itemFields = [ \
   StructField('item_id', StringType(), True), \
   StructField('item_name', StringType(), True) \
]
 
itemSchema = StructType(itemFields)
df_item1 = sqlContext.createDataFrame(item1, itemSchema)
 
df_item1.registerTempTable('item')
 
result = sqlContext.sql(
   'SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
  LEFT OUTER JOIN item ON log_agg.item_id = item.item_id'
)
 
result_to_store = result.rdd \
     .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile('result_file')

我敢肯定,在Spark SQL中有更紧凑,更优雅的方法可以做到这一点,但这只是概述。

现在,我们已命名字段,类型安全性和紧凑的SQL代码,这些代码对于数据分析人员来说更易读。 生产率提高了,这是Pig的更好替代方案。

缺点是每个SQL现在都是一个黑匣子,只能作为一个整体进行测试,如果结果与预期的不同或执行时间很慢,这可能会很棘手。 然后,由开发人员来设计仍然可读且可以作为单独的代码单元执行的步骤。

从Hive Metastore HCatalog加载数据

如果将我们的数据存储在Hive HCatalog中,则所有DataFrame元数据都将从metastore继承,而Spark代码将更加简单:

conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
 
f1_df = sqlContext.sql(
   "SELECT item_id, count(*) AS nb_entries \
   FROM my_db.log \
   WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
f1_df.registerTempTable('log_agg')
 
result = sqlContext.sql(
   "SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
LEFT OUTER JOIN my_db.item item ON log_agg.item_id = item.item_id"
)
 
result_to_store = result.rdd \
   .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile(outputFileName)

现在,这是一段更加紧凑和易读的代码:)

现在,让我们进一步将其优势推向Spark:用户定义的函数。

用户定义的功能

如前所述,在Spark中,显然不需要UDF。 您只需将函数编写为Python方法:

在猪中:

/* the function below has been written and deployed in a jar file */
DEFINE myFancyUdf com.mydomain.myfunction1;
 
...
 
log1 = FOREACH log0 GENERATE field1, myFancyUdf (field1l);

在Spark中:

def myFancyUdf(f1):
   someStuff
   return result
 
log1 = log0.map (lambda field1: (field1, myFancyUdf(field1))

更高级的主题

在本节中,让我们通过两个示例来看一下Pig在Spark中的更强大的功能:

地图侧联接

Pig的一个方便功能是地图侧连接,其中要连接的表之一很小,足以发送给每个工作人员以参与Map作业(不需要更昂贵的Reduce作业)。 通过使用JOIN上的“ replicated”提示,可以方便地执行此操作。

想象一下,在我们之前的示例中,“ item”表足够小以适合内存。 join1别名变为:

join1 = JOIN f4 BY item_id, item1 BY item_id USING ‘replicated;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;

在Spark中,使用广播变量非常容易执行此操作:

# broadcast items
item_bc = sc.broadcast(item.collect())
 

'''
gets item name from its id
'''

def getItemName (item_id_to_match): # we know there will be only one result, so we take the first from the list
  (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]

项目表在每个工作程序节点上广播。 然后,getItemName()函数在广播的表中查找包含给定item_id的记录,并返回其名称。 对于每个处理的记录,在Spark作业的地图端调用此函数。

现在,完整的代码如下所示:

'''
gets item name from its id
'''


def getItemName (item_id_to_match):
 # we know there will be only one result, so we take the first from the
   (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
   return name
 
f1_df = sqlContext.sql(
  "SELECT item_id, count(*) AS nb_entries \
     FROM my_db.log \
    WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
item_df = sqlContext.sql(
   "SELECT item_id, item_name \
      FROM my_db.item"
)
 
item_bc = sc.broadcast(item_df.rdd.collect())
 
result = f1_df.rdd.map (lambda= result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

窗口函数:获取n个分组项的排序列表的第一个匹配项

有时需要查找一个表的前n个记录,并按一个常用功能分组。 从示例的日志文件中,让我们为每一项获取10条最近的记录(在SQL中,这将是一种窗口函数,如PARTITION BY)。

在Pig中,这可以通过如下代码来完成:

f0 = LOAD ‘logfile’ using PigStorage('\t') AS (log_date:char array, item_id:chararray, some_stuff:chararray); 
 
f1 = GROUP f0 BY item_id; 
 
f2 = FOREACH f1 {
   o = ORDER f0 BY log_date DESC;
   l = LIMIT o 10;
   GENERATE FLATTEN(l) AS (log_date, item_id, some_stuff);
}

在Spark中,使用低级RDD或SparkSQL Windowing功能也是可行的。

让我们从RDD低级解决方案开始:

# create a tuple with the key for the GroupBy
f1 = f0.map (lambda (log_date, item_id, some_stuff): (item_id, (log_date, some_stuff)))
 
f2 = f1.groupByKey()
 
# result of the GroupBy is a tuple (item_id, iterable over grouped items)
# we sort the iterable according to log_date and retain only first 10 elements
f3 = f2.map (lambda (item_id, iter1): (item_id, sorted(list(iter1), key=lambda (log_date, item_id, some_stuff):log_date, reverse=True)[:10]))
 
# transform tuples of (item_id, [(log_date, item_id, some_stuff), ...]) into tuples of (log_date, item_id, some_stuff)
f4 = f3.flatMapValues(lambda x:x) \
.map (lambda (item_id, (log_date, some_stuff)):(log_date, item_id, some_stuff)

它不是很优雅,但是可以做到。

然后是SparkSQL解决方案:

f1_df = sqlContext.sql(
'SELECT \
  log_date, \
  item_id,  \
  some_stuff  \
FROM (  \
  SELECT  \
  log_date, \
  item_id,  \
  some_stuff, \
  dense_rank() OVER (PARTITION BY item_id ORDER BY log_date DESC) as rank \
FROM my_db.log) tmp \
WHERE rank <= 10')
 
f2 = f1_df.rdd.map (lambda row: (row.log_date, row.item_id, row.some_stuff))

好多了!

结论

我自愿从此博客文章中排除了一些有趣的主题,例如部署,调试,执行监视,动态资源分配,分区和拆分大小调整,采样等。此特定博客文章的目的是向Pig开发人员展示如何开始编码在Spark中; 我希望从这个角度来看,您会找到帮助。 如果您还有其他问题,请在下面的评论部分中提问。

翻译自: https://www.javacodegeeks.com/2016/07/pig-spark-easy-journey-spark-apache-pig-developers.html

 类似资料: