当前位置: 首页 > 工具软件 > MapD Core > 使用案例 >

1 Spark入门各种map的操作,java语言

公羊渝
2023-12-01

Spark基本操作主要就是各种map、reduce,这一篇从各种map开始。由于scala不熟悉,而且语法太精简,虽然代码量少了,但是可读性差了不少,就还是用Java来操作。

直接开始上代码了,注意,如果只是本地测试spark的各种api的使用,是不需要下载安装任何spark、Hadoop的。直接引入maven依赖就可以了。

新建一个java的maven项目,pom中引入spark的依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tianyalei</groupId>
    <artifactId>spark_learning</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.3.0</spark.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>
</project>

这里有spark-sql、spark机器学习、spark-hive等的依赖,目前是用不上的。

1 简单map

map(function) 

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

注意,map是一对一的。

package map;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleMap {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
        //计算所有元素的和
        System.out.println(originRDD.reduce((a, b) -> a + b));

        //******************map的使用***************//
        //将原始元素每个都乘以2
        JavaRDD<Integer> doubleRDD = originRDD.map(s -> s * 2);
        //将RDD收集起来,组成list
        List<Integer> doubleData = doubleRDD.collect();
        System.out.println(doubleData);
        int total = doubleRDD.reduce((a, b) -> a + b);
        System.out.println(total);

        //使用map将key变成key-value,添加value
        List<String> list = Arrays.asList("af", "bbbb", "c", "d", "e");
        JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
        //转为key-value形式
        JavaPairRDD pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
        List list1 = pairRDD.collect();
        //[(af,1), (bbbb,1), (c,1), (d,1), (e,1)]
        System.out.println(list1);

        //转为key-value,value不变,修改key
        JavaPairRDD valueRDD = stringRDD.mapToPair(k -> new Tuple2<>(k.length(), k));
        List list2 = valueRDD.collect();
        //[(2, af),(4, bbbb),(1, c),(1, d),(1, e)]
        System.out.println(list2);
        //mapValues
        List list3 = valueRDD.mapValues(s -> s + "_tail").collect();
        //[(2,af_tail), (4,bbbb_tail), (1,c_tail), (1,d_tail), (1,e_tail)]
        System.out.println(list3);


    }
}

这里面注释比较完整了,有简单的将每个元素乘以2的,有将每个元素转成key-value的,有修改key-value的key或者value的。

2 MapPartition分区map

package map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SparkSession;

import java.util.*;

/**
 * mapPartition
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleMapPartition {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        //使用map将key变成key-value,添加value
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        //分为2个分区
        JavaRDD<Integer> stringRDD = javaSparkContext.parallelize(list, 2);
        //与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。
        // 如果在map过程中需要频繁创建额外的对象,(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),
        // 则mapPartitions效率比map高的多。
        JavaRDD rdd = stringRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {

            @Override
            public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
                int sum = 0;
                while (integerIterator.hasNext()) {{
                    sum += integerIterator.next();
                }}
                List<Integer> list1 = new LinkedList<>();
                list1.add(sum);
                return list1.iterator();
            }
        });

        List list1 = rdd.collect();
        //[3, 12]
        System.out.println(list1);


    }
}

将List分区然后map。这里将1,2,3,4,5分为2个区,然后对每个分区进行累加。

结果是1+2,3+4+5.如果是分3个区,则是1,2+3,4+5.

3 FlatMap

package map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleFlatMap {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<String> data = Arrays.asList("hello world", "java spark", "hello spark");
        JavaRDD<String> originRDD = javaSparkContext.parallelize(data);

        //flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
        //RDD经过map后元素数量不变,经过flatmap后,一个元素可以变成多个元素
        JavaRDD<String> flatMap = originRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());

        System.out.println(flatMap.collect());
    }
}    

FlatMap则是将每个元素变成多个元素,像上面例子,最终结果["hello", "world", "java" ^^^^^^]

因为将每个元素按空格split了,最终再集合起来。

4 mapDouble

package map;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

/**
 * @author wuweifeng wrote on 2018/4/12.
 */
public class SimpleMapDouble {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        List<Integer> random = Collections.unmodifiableList(
                new Random()
                        .ints(-100, 101).limit(100000000)
                        .boxed()
                        .collect(Collectors.toList())
        );

        //Add more than 100'000'000 of random integers using the Java 8 library for Stream and split them in 10 slides.
        JavaRDD<Integer> rdd = javaSparkContext.parallelize(random, 10);

        //The next step will compute for all the members the power 2 and then generate the JavaDoubleRDD with the statistics.
        JavaDoubleRDD result = rdd.mapToDouble(x -> (double) x * x);

        //Print the statistics for the given DoubleRDD: count, mean stdev, max and min
        System.out.println(result.stats().toString());

    }
}

这个则是将各元素计算平方,并转为double,最终打印结果的中位数、最大值、最小值、平均值等。可见JavaDoubleRDD用来做统计计算比较方便。

这就是map的几个最常用用法

 类似资料: