一、创建pairRDD的方法
①python脚本,使用
map() 函数
示例把句子的第一个单词作为键,句子作为值:
>>> line=sc.parallelize(["hello world","very good","yes right"])
>>> map = line.map(lambda s:((s.split(" "))[0],s))
>>> map.collect()
[('hello', 'hello world'), ('very', 'very good'), ('yes', 'yes right')]
>>>
可以看到python中的转化非常方便,直接返回一个二元组即可。
②使用java,
mapToPair( ) 函数,传入一个PairFunction对象参数
//使用mapToPair转化为键值对,例如: (world,1)
JavaPairRDD<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
在java中,键值对RDD的类型为
JavaPairRDD<T1,T2>
③从内存中创建pairRDD
python和scala只要使用
parallelize()函数传入键值对形式的参数。
python脚本示例:
>>> pair=sc.parallelize([("hello",1),("good",2),("yes",3)])
>>> pair.collect()
[('hello', 1), ('good', 2), ('yes', 3)]
>>>
java中从内存创建pairRDD需要使用
parallelizePairs()函数
示例:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test07");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String,String>> list = new ArrayList<Tuple2<String,String>>();
list.add(new Tuple2<String, String>("hello","hello world"));
list.add(new Tuple2<String, String>("good","good job"));
list.add(new Tuple2<String, String>("hi","hi world"));
JavaPairRDD<String, String> pair = sc.parallelizePairs(list);
System.out.println(pair.collect());
sc.stop();
}
二、pairRDD转化操作
①
filter()函数同样可以适用于键值对RDD
python示例如下:
>>> pair.collect()
[(1, 2), (2, 3), (1, 4), (3, 4), (3, 5), (5, 5)]
>>> pair2=pair.filter(lambda value:value[1]<5)
>>> pair2.collect()
[(1, 2), (2, 3), (1, 4), (3, 4)]
>>>
java示例如下:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test08");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入数据
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer, Integer>(1, 2));
list.add(new Tuple2<Integer, Integer>(2, 4));
list.add(new Tuple2<Integer, Integer>(3, 5));
JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);
//filter筛选
JavaPairRDD<Integer, Integer> filter = pair.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
public Boolean call(Tuple2<Integer, Integer> t) throws Exception {
return t._2 < 5;
}
});
System.out.println(filter.collect());
sc.stop();
}
②
mapValues() 函数可以只对value做映射转化操作
python脚本示例:
>>> pair.collect()
[(1, 2), (2, 3), (1, 4), (3, 4), (3, 5), (5, 5)]
>>> pair2=pair.mapValues(lambda v:v+1)
>>> pair2.collect()
[(1, 3), (2, 4), (1, 5), (3, 5), (3, 6), (5, 6)]
>>>
java程序示例:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test08");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入数据
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer, Integer>(1, 2));
list.add(new Tuple2<Integer, Integer>(2, 4));
list.add(new Tuple2<Integer, Integer>(3, 5));
JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);
JavaPairRDD<Integer, Object> result = pair.mapValues(new Function<Integer, Object>() {
public Object call(Integer integer) throws Exception {
return integer*integer;
}
});
System.out.println(result.collect());
sc.stop();
}
③聚合操作
*
reduceByKey() 函数可以根据键的值做聚合,即把具有相同键值的二元组聚合起来。
python shell示例:
>>> pair=sc.parallelize([(1,2),(2,3),(1,4),(2,5),(1,9),(3,5),(2,5),(3,6)])
>>> pair2=pair.reduceByKey(lambda x,y:x+y)
>>> pair2.collect()
[Stage 0:> (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
[Stage 1:> (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
[(2, 13), (1, 15), (3, 11)]
>>>
java程序示例:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test10");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入数据
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer, Integer>(1, 2));
list.add(new Tuple2<Integer, Integer>(1, 3));
list.add(new Tuple2<Integer, Integer>(2, 4));
list.add(new Tuple2<Integer, Integer>(3, 5));
list.add(new Tuple2<Integer, Integer>(2, 1));
JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);
JavaPairRDD<Integer, Integer> result = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println(result.collect());
sc.stop();
}
*
countByValue() 函数可以直接对RDD进行统计,返回一个键值对,键为数据,值为出现次数,例如在单词计数中可以用到。
python示例:
>>> line=sc.textFile("D:\spark\README.md")
>>> words=line.flatMap(lambda x : x.split(" "))
>>> res=words.countByValue()
>>> res
defaultdict(<class 'int'>, {'': 67, 'guide,': 1, 'APIs': 1, 'For': 2, 'name': 1, 'mesos://': 1, 'for': 11, 'tools': 1, 'shell:': 2, 'Pi': 1, 'find': 1, 'sc.parallelize(1': 1, 'Configuration': 1, 'computing': 1, 'start': 1, 'return': 2, 'see': 1, 'directory.': 1, 'Hadoop,': 2, 'rich': 1, 'sc.parallelize(range(1000)).count()': 1, 'graph': 1, 'you': 4, '["Specifying': 1, 'using:': 1, 'package': 1, 'Building': 1, 'how': 2, 'built,': 1, 'several': 1, 'A': 1, 'The': 1, 'documentation,': 1, '"local"': 1, 'module,': 1, 'programming': 1, 'threads.': 1, 'Online': 1, 'scala>': 1, 'a': 8, 'optimized': 1, 'online': 1, 'package.)': 1, './bin/spark-shell': 1, 'distributions.': 1, 'with': 3, '[Configuration': 1, '[project': 2, 'uses': 1, '1000:': 2, '[building': 1, 'version': 1, 'README': 1, 'MASTER=spark://host:7077': 1, 'help': 1, 'file': 1, 'be': 2, 'locally.': 1, 'learning,': 1, 'clean': 1, 'instructions.': 1, '[Apache': 1, 'running': 1, 'build': 3, 'Shell': 2, 'examples': 2, 'MLlib': 1, 'that': 2, 'Maven](http://maven.apache.org/).': 1, 'latest': 1, 'to': 14, 'abbreviated': 1, 'not': 1, 'Spark.': 1, 'Documentation': 1, 'library': 1, 'general': 2, 'fast': 1, 'high-level': 1, 'build/mvn': 1, 'YARN,': 1, 'one': 2, 'this': 1, 'built': 1, 'individual': 1, '(You': 1, 'system': 1, 'Hadoop': 3, '`./bin/run-example': 1, 'wiki](https://cwiki.apache.org/confluence/display/SPARK).': 1, 'easiest': 1, 'tests': 2, 'systems.': 1, 'setup': 1, 'way': 1, 'machine': 1, 'prefer': 1, 'Scala': 2, 'To': 2, '1000).count()': 1, 'which': 2, '>>>': 1, '-DskipTests': 1, 'variable': 1, 'Note': 1, 'Python,': 2, 'Spark"](http://spark.apache.org/docs/latest/building-spark.html).': 1, 'other': 1, 'use': 3, 'storage': 1, 'Streaming': 1, './dev/run-tests': 1, 'Example': 1, 'guidance': 2, '[run': 1, 'set': 2, 'at': 2, 'will': 1, 'documentation': 3, 'Interactive': 2, 'from': 1, 'processing,': 1, 'including': 3, 'talk': 1, 'computation': 1, 'Big': 1, 'And': 1, 'thread,': 1, 'different': 1, 'building': 2, 'can': 6, 'using': 2, 'Tests': 1, 'print': 1, 'run:': 1, 'Scala,': 1, 'stream': 1, 'particular': 2, 'its': 1, 'your': 1, 'must': 1, 'the': 21, 'params': 1, 'Running': 1, 'programs': 2, '"local[N]"': 1, 'changed': 1, 'first': 1, 'following': 2, 'runs.': 1, 'Please': 3, 'SQL': 2, 'You': 3, 'core': 1, 'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).': 1, 'configure': 1, 'It': 2, 'environment': 1, 'Hive': 2, 'HDFS': 1, 'Testing': 1, 'processing.': 1, 'locally': 2, 'Programs': 1, 'have': 1, 'and': 10, 'Once': 1, 'or': 3, 'higher-level': 1, 'only': 1, 'detailed': 2, 'when': 1, 'Python': 2, 'cluster.': 1, 'instance:': 1, 'also': 4, 'in': 5, 'if': 4, 'same': 1, 'downloaded': 1, 'Versions': 1, 'Alternatively,': 1, './bin/run-example': 2, 'project': 1, 'page](http://spark.apache.org/documentation.html)': 1, '[params]`.': 1, 'Try': 1, 'supports': 2, 'package.': 1, 'About': 1, 'should': 2, 'sample': 1, 'need': 1, 'command,': 2, 'example': 3, 'are': 1, 'MASTER': 1, 'no': 1, 'run': 7, 'GraphX': 1, '["Building': 1, 'DataFrames,': 1, 'distribution': 1, '#': 1, './bin/pyspark': 1, 'engine': 1, 'usage': 1, 'example:': 1, 'analysis.': 1, 'of': 5, 'provides': 1, 'available': 1, 'them,': 1, 'comes': 1, 'graphs': 1, 'Hadoop-supported': 1, 'Many': 1, 'Because': 1, 'Spark](#building-spark).': 1, 'basic': 1, 'data': 1, 'do': 2, 'given.': 1, 'on': 5, 'an': 3, 'versions': 1, 'Guide](http://spark.apache.org/docs/latest/configuration.html)': 1, 'Data.': 1, 'Apache': 1, 'Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)': 1, 'spark://': 1, 'requires': 1, 'class': 2, 'programs,': 1, '##': 8, 'R,': 1, 'refer': 2, 'URL,': 1, 'Java,': 1, '`examples`': 2, '<http://spark.apache.org/>': 1, 'site,': 1, 'contains': 1, 'More': 1, 'through': 1, 'pre-built': 1, 'This': 2, 'against': 1, '"yarn"': 1, 'Spark': 13, 'cluster': 2, 'submit': 1, 'is': 6, 'N': 1, 'Thriftserver': 1, 'web': 1, 'protocols': 1, '<class>': 1, 'overview': 1, 'SparkPi': 2})
>>>
java示例:
SparkConf conf = new SparkConf().setMaster("local").setAppName("test06");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入测试数据
JavaRDD<String> sentance = sc.parallelize(Arrays
.asList("hello world hi this a", "good yes hello", "world a", "hi very good"));
//使用flatMap转化为单词
JavaRDD<String> words = sentance.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
Map<String, Long> res = words.countByValue();
System.out.println(res);
*
combineByKey() 函数,类似于普通RDD中的聚合函数aggregate(),需要传入三个参数:第一次遇到某个键的初始值设定,再次遇到某个键时的聚合函数,多个分片聚合的规则函数
python脚本示例:
>>> pair=sc.parallelize([(1,2),(2,3),(1,3),(2,5),(1,6),(3,5),(5,5),(2,1)]) #输入数据
>>> com_pair=pair.combineByKey(lambda x:(x,1),lambda x,v:(x[0]+v,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])) #使用combineByKey聚合成为(值,数目)的形式
>>> com_pair.collect()
[(2, (9, 3)), (1, (11, 3)), (3, (5, 1)), (5, (5, 1))]
>>> res=com_pair.mapValues(lambda x:x[0]/x[1]) #转化值,计算平均值
>>> res.collect()
[(2, 3.0), (1, 3.6666666666666665), (3, 5.0), (5, 5.0)]
>>>
java示例:
public class TestCombineByKey {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test10");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入数据
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer, Integer>(1, 2));
list.add(new Tuple2<Integer, Integer>(1, 3));
list.add(new Tuple2<Integer, Integer>(2, 4));
list.add(new Tuple2<Integer, Integer>(3, 5));
list.add(new Tuple2<Integer, Integer>(2, 1));
JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);
/**
* 当想要按键的值计算平均数的时候,reduceByMap()函数就不太好用了,这时候需要使用combineByKey函数
*/
JavaPairRDD<Integer, Tuple2<Integer, Integer>> com_pair = pair.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() { //这个函数用于第一次遇到某个键的时候初始化(总值,数目)键值对
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer, 1);
}
}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { //这个函数用于非首次遇到某个键的时候的合并操作
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t, Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(t._1 + integer, t._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<Integer, Integer>(t1._1 + t2._1, t1._2 + t2._2);
}
});
JavaPairRDD<Integer, Double> result = com_pair.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
public Double call(Tuple2<Integer, Integer> t) throws Exception {
return (double) t._1 / t._2;
}
});
System.out.println(result.collect());
sc.stop();
}
}
这里我是用了Tuple2作为存储(某个键总值,数目)的键值对,也可以使用自定义的类来做,不过注意:一定要实现序列化接口。
其实这里求同一键值对应的平均数,使用map配合reduceByKey也可以实现,不过这里使用combineByKey将所有操作都集中到了一个函数的参数中。
在 Spark 中使用这些专用的聚合函数, 始终要比手动将数据分组再归约快很多。
*
groupByKey() 函数可以将键值相同的元素的键合并,值形成数组形式:
java示例:
public class TestGroupByKey {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test11");
JavaSparkContext sc = new JavaSparkContext(conf);
//插入数据
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer, Integer>(1, 2));
list.add(new Tuple2<Integer, Integer>(1, 3));
list.add(new Tuple2<Integer, Integer>(2, 4));
list.add(new Tuple2<Integer, Integer>(3, 5));
list.add(new Tuple2<Integer, Integer>(2, 1));
JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);
JavaPairRDD<Integer, Iterable<Integer>> after_group = pair.groupByKey();
System.out.println(after_group.collect());
sc.stop();
}
}
/*
[(1,[2, 3]), (3,[5]), (2,[4, 1])]
*/
*
keys() 与
values() 函数
python shell示例:
>>> pair.collect()
[(1, 2), (2, 3), (3, 4), (1, 3), (2, 5), (2, 6), (1, 2), (3, 9)]
>>> pair.keys()
PythonRDD[18] at RDD at PythonRDD.scala:43
>>> pair.keys().collect()
[1, 2, 3, 1, 2, 2, 1, 3]
>>> pair.values().collect()
[2, 3, 4, 3, 5, 6, 2, 9]
>>>
*
sortByKey() 按照键的大小排序,如果不传入排序方式则按照默认的方式排序(我猜是ascii码值从小到大排序,待验证)
python shell 示例:
>>> pair.sortByKey().collect()
[Stage 15:> (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
[Stage 15:=============================> (1 + 1) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
[(1, 2), (1, 3), (1, 2), (2, 3), (2, 5), (2, 6), (3, 4), (3, 9)]
>>>