Apache pig是用来处理大规模数据的高级查询语言,配合Hadoop使用,可以在处理海量数据时达到事半功倍的效果,比使用Java,C++等语言编写大规模数据处理程序的难度要小N倍,实现同样的效果的代码量也小N倍。Twitter就大量使用pig来处理海量数据——有兴趣的,可以看Twitter工程师写的这个PPT。
但是,刚接触pig时,可能会觉得里面的某些概念以及程序实现方法与想像中的很不一样,甚至有些莫名,所以,你需要仔细地研究一下基础概念,这样在写pig程序的时候,才不会觉得非常别扭。
本文基于以下环境:
pig 0.8.1
先给出两个链接:pig参考手册1,pig参考手册2。本文的部分内容来自这两个手册,但涉及到翻译的部分,也是我自己翻译的,因此可能理解与英文有偏差,如果你觉得有疑义,可参考英文内容。
(1)关系(relation)、包(bag)、元组(tuple)、字段(field)、数据(data)的关系
“元组”这个词很抽象,你可以把它想像成关系型数据库表中的一行,它含有一个或多个字段,其中,每一个字段可以是任何数据类型,并且可以有或者没有数据。
“关系”可以比喻成关系型数据库的一张表,而上面说了,“元组”可以比喻成数据表中的一行,那么这里有人要问了,在关系型数据库中,同一张表中的每一行都有固定的字段数,pig中的“关系”与“元组”之间,是否也是这样的情况呢?不是的。“关系”并不要求每一个“元组”都含有相同数量的字段,并且也不会要求各“元组”中在相同位置处的字段具有相同的数据类型(太随意了,是吧?)
文章来源:http://www.codelast.com/
(2)一个 计算多维度组合下的平均值 的实际例子
为了帮助大家理解pig的一个基本的数据处理流程,我造了一些简单的数据来举个例子——
假设有数据文件:a.txt(各数值之间是以tab分隔的):
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
|
问题如下:怎样求出在第2、3、4列的所有组合的情况下,最后两列的平均值分别是多少?
例如,第2、3、4列有一个组合为(1,2,3),即第一行和最后一行数据。对这个维度组合来说,最后两列的平均值分别为:
(4.2+1.4)/2=2.8
(9.8+0.2)/2=5.0
而对于第2、3、4列的其他所有维度组合,都分别只有一行数据,因此最后两列的平均值其实就是它们自身。
特别地,组合(7,9,9)有两行记录:第三、四行,但是第三行数据的最后两列没有值,因此它不应该被用于平均值的计算,也就是说,在计算平均值时,第三行是无效数据。所以(7,9,9)组合的最后两列的平均值为 2.6 和 6.2。
我们现在用pig来算一下,并且输出最终的结果。
先进入本地调试模式(pig -x local),再依次输入如下pig代码:
1
2
3
4
|
A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
B =
GROUP
A
BY
(col2, col3, col4);
C = FOREACH B GENERATE
group
,
AVG
(A.col5),
AVG
(A.col6);
DUMP C;
|
pig输出结果如下:
1
2
3
4
|
((1,2,3),2.8,5.0)
((1,2,5),7.7,5.9)
((3,0,5),3.5,2.1)
((7,9,9),2.6,6.2)
|
这个结果对吗?手工算一下就知道是对的。
文章来源:http://www.codelast.com/
下面,我们依次来看看每一句pig代码分别得到了什么样的数据。
①加载 a.txt 文件,并指定每一列的数据类型分别为 chararray(字符串),int,int,int,double,double。同时,我们还给予了每一列别名,分别为 col1,col2,……,col6。这个别名在后面的数据处理中会用到——如果你不指定别名,那么在后面的处理中,就只能使用索引($0,$1,……)来标识相应的列了,这样可读性会变差,因此,在列固定的情况下,还是指定别名的好。
将数据加载之后,保存到变量A中,A的数据结构如下:
1
|
A: {col1: chararray,col2:
int
,col3:
int
,col4:
int
,col5:
double
,col6:
double
}
|
可见,A是用大括号括起来的东西。根据本文前面的说法,A是一个包(bag)。
这个时候,A与你想像中的样子应该是一致的,也就是与前面打印出来的 a.txt 文件的内容是一样的,还是一行一行的类似于“二维表”的数据。
文章来源:http://www.codelast.com/
②按照A的第2、3、4列,对A进行分组。pig会找出所有第2、3、4列的组合,并按照升序进行排列,然后将它们与对应的包A整合起来,得到如下的数据结构:
1
|
B: {
group
: (col2:
int
,col3:
int
,col4:
int
),A: {col1: chararray,col2:
int
,col3:
int
,col4:
int
,col5:
double
,col6:
double
}}
|
可见,A的第2、3、4列的组合被pig赋予了一个别名:group,这很形象。同时我们也观察到,B的每一行其实就是由一个group和若干个A组成的——注意,是若干个A。这里之所以只显示了一个A,是因为这里表示的是数据结构,而不表示具体数据有多少组。
实际的数据为:
1
2
3
4
|
((1,2,3),{(a,1,2,3,4.2,9.8),(a,1,2,3,1.4,0.2)})
((1,2,5),{(a,1,2,5,7.7,5.9)})
((3,0,5),{(a,3,0,5,3.5,2.1)})
((7,9,9),{(b,7,9,9,,),(a,7,9,9,2.6,6.2)})
|
可见,与前面所说的一样,组合(1,2,3)对应了两行数据,组合(7,9,9)也对应了两行数据。
这个时候,B的结构就不那么明朗了,可能与你想像中有一点不一样了。
文章来源:http://www.codelast.com/
③计算每一种组合下的最后两列的平均值。
根据上面得到的B的数据,你可以把B想像成一行一行的数据(只不过这些行不是对称的),FOREACH 的作用是对 B 的每一行数据进行遍历,然后进行计算。
GENERATE 可以理解为要生成什么样的数据,这里的 group 就是上一步操作中B的第一项数据(即pig为A的第2、3、4列的组合赋予的别名),所以它告诉了我们:在数据集 C 的每一行里,第一项就是B中的group——类似于(1,2,5)这样的东西)。
而 AVG(A.col5) 这样的计算,则是调用了pig的一个求平均值的函数 AVG,用于对 A 的名为 col5 的列求平均值。前文说了,在加载数据到A的时候,我们已经给每一列起了个别名,col5就是倒数第二列。
到这里,可能有人要迷糊了:难道 AVG(A.col5) 不是表示对 A 的col5这一列求平均值吗?也就是说,在遍历B(FOREACH B)的每一行时候,计算结果都是相同的啊!
事实上并不是这样。我们遍历的是B,我们需要注意到,B的数据结构中,每一行数据里,一个group对应的是若干个A,因此,这里的 A.col5,指的是B的每一行中的A,而不是包含全部数据的那个A。拿B的第一行来举例:
((1,2,3),{(a,1,2,3,4.2,9.8),(a,1,2,3,1.4,0.2)})
遍历到B的这一行时,要计算AVG(A.col5),pig会找到 (a,1,2,3,4.2,9.8) 中的4.2,以及(a,1,2,3,1.4,0.2)中的1.4,加起来除以2,就得到了平均值。
同理,我们也知道了AVG(A.col6)是怎么算出来的。但还有一点要注意的:对(7,9,9)这个组,它对应的数据(b,7,9,9,,)里最后两列是无值的,这是因为我们的数据文件对应位置上不是有效数字,而是两个“-”,pig在加载数据的时候自动将它置为空了,并且计算平均值的时候,也不会把这一组数据考虑在内(相当于忽略这组数据的存在)。
到了这里,我们不难理解,为什么C的数据结构是这样的了:
1
|
C: {
group
: (col2:
int
,col3:
int
,col4:
int
),
double
,
double
}
|
文章来源:http://www.codelast.com/
④DUMP C就是将C中的数据输出到控制台。如果要输出到文件,需要使用:
1
|
STORE C
INTO
'output'
;
|
这样pig就会在当前目录下新建一个“output”目录(该目录必须事先不存在),并把结果文件放到该目录下。
请想像一下,如果要实现相同的功能,用Java或C++写一个Map-Reduce应用程序需要多少时间?可能仅仅是写一个build.xml或者Makefile,所需的时间就是写这段pig代码的几十倍了!
正因为pig有如此优势,它才得到了广泛应用。
文章来源:http://www.codelast.com/
(3)怎样统计数据行数
在SQL语句中,要统计表中数据的行数,很简单:
1
|
SELECT
COUNT
(*)
FROM
table_name
WHERE
condition
|
在pig中,也有一个COUNT函数,在pig手册中,对COUNT函数有这样的说明:
Computes the number of elements in a bag.
假设要计算数据文件a.txt的行数:
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
|
你是否可以这样做呢:
1
2
3
|
A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
B =
COUNT
(*);
DUMP B;
|
答案是:绝对不行。pig会报错。pig手册中写得很明白:
Note: You cannot use the tuple designator (*) with COUNT; that is, COUNT(*) will not work.
那么,这样对某一列计数行不行呢:
1
|
B =
COUNT
(A.col2);
|
答案是:仍然不行。pig会报错。
这就与我们想像中的“正确做法”有点不一样了:我为什么不能直接统计一个字段的数目有多少呢?刚接触pig的时候,一定非常疑惑这样明显“不应该出错”的写法为什么行不通。
要统计A中含col2字段的数据有多少行,正确的做法是:
1
2
3
4
|
A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
B =
GROUP
A
ALL
;
C = FOREACH B GENERATE
COUNT
(A.col2);
DUMP C;
|
输出结果:
1
|
(6)
|
表明有6行数据。
如此麻烦?没错。这是由pig的数据结构决定的。
文章来源:http://www.codelast.com/
在这个例子中,统计COUNT(A.col2)和COUNT(A)的结果是一样的,但是,如果col2这一列中含有空值:
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
a.txt
a 1 2 3 4.2 9.8
a 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
|
则以下pig程序及执行结果为:
1
2
3
4
5
|
grunt> A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
grunt> B =
GROUP
A
ALL
;
grunt> C = FOREACH B GENERATE
COUNT
(A.col2);
grunt> DUMP C;
(5)
|
可见,结果为5行。那是因为你LOAD数据的时候指定了col2的数据类型为int,而a.txt的第二行数据是空的,因此数据加载到A以后,有一个字段就是空的:
1
2
3
4
5
6
7
|
grunt> DUMP A;
(a,1,2,3,4.2,9.8)
(a,,0,5,3.5,2.1)
(b,7,9,9,,)
(a,7,9,9,2.6,6.2)
(a,1,2,5,7.7,5.9)
(a,1,2,3,1.4,0.2)
|
在COUNT的时候,null的字段不会被计入在内,所以结果是5。
The COUNT function follows syntax semantics and ignores nulls. What this means is that a tuple in the bag will not be counted if the first field in this tuple is NULL. If you want to include NULL values in the count computation, use COUNT_STAR.
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
|
如果我们按照前文的做法,计算多维度组合下的最后两列的平均值,则:
1
2
3
4
5
6
7
8
|
grunt> A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
grunt> B =
GROUP
A
BY
(col2, col3, col4);
grunt> C = FOREACH B GENERATE
group
,
AVG
(A.col5),
AVG
(A.col6);
grunt> DUMP C;
((1,2,3),2.8,5.0)
((1,2,5),7.7,5.9)
((3,0,5),3.5,2.1)
((7,9,9),2.6,6.2)
|
可见,输出结果中,每一行的第一项是一个tuple(元组),我们来试试看 FLATTEN 的作用:
1
2
3
4
5
6
7
8
|
grunt> A = LOAD
'a.txt'
AS (col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B GENERATE FLATTEN(group), AVG(A.col5), AVG(A.col6);
grunt> DUMP C;
(1,2,3,2.8,5.0)
(1,2,5,7.7,5.9)
(3,0,5,3.5,2.1)
(7,9,9,2.6,6.2)
|
看到了吗?被 FLATTEN 的group本来是一个元组,现在变成了扁平的结构了。按照pig文档的说法,FLATTEN用于对元组(tuple)和包(bag)“解嵌套”(un-nest):
The FLATTEN operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and result is different for each type of structure.For tuples, flatten substitutes the fields of a tuple in place of the tuple. For example, consider a relation that has a tuple of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).
文章来源:http://www.codelast.com/
所以我们就看到了上面的结果。
在有的时候,不“解嵌套”的数据结构是不利于观察的,输出这样的数据可能不利于外围数程序的处理(例如,pig将数据输出到磁盘后,我们还需要用其他程序做后续处理,而对一个元组,输出的内容里是含括号的,这就在处理流程上又要多一道去括号的工序),因此,FLATTEN提供了一个让我们在某些情况下可以清楚、方便地分析数据的机会。
(5)关于GROUP操作符
在上文的例子中,已经演示了GROUP操作符会生成什么样的数据。在这里,需要说得更理论一些:
(6)把数据当作“元组”(tuple)来加载
还是假设有如下数据:
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
|
如果我们按照以下方式来加载数据:
1
|
A =
LOAD
'a.txt'
AS
(col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
);
|
那么得到的A的数据结构为:
1
2
|
grunt> DESCRIBE A;
A: {col1: chararray,col2:
int
,col3:
int
,col4:
int
,col5:
double
,col6:
double
}
|
如果你要把A当作一个元组(tuple)来加载:
1
|
A =
LOAD
'a.txt'
AS
(T : tuple (col1:chararray, col2:
int
, col3:
int
, col4:
int
, col5:
double
, col6:
double
));
|
也就是想要得到这样的数据结构:
1
2
|
grunt> DESCRIBE A;
A: {T: (col1: chararray,col2:
int
,col3:
int
,col4:
int
,col5:
double
,col6:
double
)}
|
那么,上面的方法将得到一个空的A:
1
2
3
4
5
6
7
|
grunt> DUMP A;
()
()
()
()
()
()
|
那是因为数据文件a.txt的结构不适合于这样加载成元组(tuple)。
文章来源:http://www.codelast.com/
如果有数据文件b.txt:
1
2
3
4
5
6
7
|
[root@localhost pig]$
cat
b.txt
(a,1,2,3,4.2,9.8)
(a,3,0,5,3.5,2.1)
(b,7,9,9,-,-)
(a,7,9,9,2.6,6.2)
(a,1,2,5,7.7,5.9)
(a,1,2,3,1.4,0.2)
|
则使用上面所说的加载方法及结果为:
1
2
3
4
5
6
7
8
|
grunt> A = LOAD
'b.txt'
AS (T : tuple (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double));
grunt> DUMP A;
((a,1,2,3,4.2,9.8))
((a,3,0,5,3.5,2.1))
((b,7,9,9,,))
((a,7,9,9,2.6,6.2))
((a,1,2,5,7.7,5.9))
((a,1,2,3,1.4,0.2))
|
可见,加载的数据的结构确实被定义成了元组(tuple)。