Spark基本操作主要就是各种map、reduce,这一篇从各种map开始。由于scala不熟悉,而且语法太精简,虽然代码量少了,但是可读性差了不少,就还是用Java来操作。
直接开始上代码了,注意,如果只是本地测试spark的各种api的使用,是不需要下载安装任何spark、Hadoop的。直接引入maven依赖就可以了。
新建一个java的maven项目,pom中引入spark的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tianyalei</groupId>
<artifactId>spark_learning</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.3.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
这里有spark-sql、spark机器学习、spark-hive等的依赖,目前是用不上的。
map(function)
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
注意,map是一对一的。
package map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* @author wuweifeng wrote on 2018/4/10.
*/
public class SimpleMap {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
//计算所有元素的和
System.out.println(originRDD.reduce((a, b) -> a + b));
//******************map的使用***************//
//将原始元素每个都乘以2
JavaRDD<Integer> doubleRDD = originRDD.map(s -> s * 2);
//将RDD收集起来,组成list
List<Integer> doubleData = doubleRDD.collect();
System.out.println(doubleData);
int total = doubleRDD.reduce((a, b) -> a + b);
System.out.println(total);
//使用map将key变成key-value,添加value
List<String> list = Arrays.asList("af", "bbbb", "c", "d", "e");
JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
//转为key-value形式
JavaPairRDD pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
List list1 = pairRDD.collect();
//[(af,1), (bbbb,1), (c,1), (d,1), (e,1)]
System.out.println(list1);
//转为key-value,value不变,修改key
JavaPairRDD valueRDD = stringRDD.mapToPair(k -> new Tuple2<>(k.length(), k));
List list2 = valueRDD.collect();
//[(2, af),(4, bbbb),(1, c),(1, d),(1, e)]
System.out.println(list2);
//mapValues
List list3 = valueRDD.mapValues(s -> s + "_tail").collect();
//[(2,af_tail), (4,bbbb_tail), (1,c_tail), (1,d_tail), (1,e_tail)]
System.out.println(list3);
}
}
这里面注释比较完整了,有简单的将每个元素乘以2的,有将每个元素转成key-value的,有修改key-value的key或者value的。
package map;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SparkSession;
import java.util.*;
/**
* mapPartition
* @author wuweifeng wrote on 2018/4/10.
*/
public class SimpleMapPartition {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
//使用map将key变成key-value,添加value
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
//分为2个分区
JavaRDD<Integer> stringRDD = javaSparkContext.parallelize(list, 2);
//与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。
// 如果在map过程中需要频繁创建额外的对象,(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),
// 则mapPartitions效率比map高的多。
JavaRDD rdd = stringRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
int sum = 0;
while (integerIterator.hasNext()) {{
sum += integerIterator.next();
}}
List<Integer> list1 = new LinkedList<>();
list1.add(sum);
return list1.iterator();
}
});
List list1 = rdd.collect();
//[3, 12]
System.out.println(list1);
}
}
将List分区然后map。这里将1,2,3,4,5分为2个区,然后对每个分区进行累加。
结果是1+2,3+4+5.如果是分3个区,则是1,2+3,4+5.
package map;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* @author wuweifeng wrote on 2018/4/10.
*/
public class SimpleFlatMap {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<String> data = Arrays.asList("hello world", "java spark", "hello spark");
JavaRDD<String> originRDD = javaSparkContext.parallelize(data);
//flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
//RDD经过map后元素数量不变,经过flatmap后,一个元素可以变成多个元素
JavaRDD<String> flatMap = originRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
System.out.println(flatMap.collect());
}
}
FlatMap则是将每个元素变成多个元素,像上面例子,最终结果["hello", "world", "java" ^^^^^^]
因为将每个元素按空格split了,最终再集合起来。
package map;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
/**
* @author wuweifeng wrote on 2018/4/12.
*/
public class SimpleMapDouble {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> random = Collections.unmodifiableList(
new Random()
.ints(-100, 101).limit(100000000)
.boxed()
.collect(Collectors.toList())
);
//Add more than 100'000'000 of random integers using the Java 8 library for Stream and split them in 10 slides.
JavaRDD<Integer> rdd = javaSparkContext.parallelize(random, 10);
//The next step will compute for all the members the power 2 and then generate the JavaDoubleRDD with the statistics.
JavaDoubleRDD result = rdd.mapToDouble(x -> (double) x * x);
//Print the statistics for the given DoubleRDD: count, mean stdev, max and min
System.out.println(result.stats().toString());
}
}
这个则是将各元素计算平方,并转为double,最终打印结果的中位数、最大值、最小值、平均值等。可见JavaDoubleRDD用来做统计计算比较方便。
这就是map的几个最常用用法