spark RDD简介
弹性分布式数据集RDD是Spark框架中的核心概念。可以将RDD视作数据库中的一张表。其中可以保存任何类型的数据。Spark将数据存储在不同分区上的RDD之中。
RDD可以帮助重新安排计算并优化数据处理过程。
RDD具有容错性,因为RDD知道如何重新创建和重新计算数据集。
RDD是不可变的。你可以用变换(Transformation)修改RDD,但是这个变换所返回的是一个全新的RDD,而原有的RDD仍然保持不变。
RDD 创建
2.1 官网下载spark-2.1.1-bin-hadoop2.6安装包,解压缩,配置jdk,spark
2.2 cat /etc/hosts 添加 127.0.0.1 主机名(hostname)
2.3 在服务器创建数据文件/tmp/ywx/test.txt,内容如下:
spark shell
hello world
hive hadoop mapreduce zookeeper
mapreduce zookeeper
hello world
world
spark-shell 方式提交任务
/tmp/ywx/test.txt
spark shell
hello world
hive hadoop mapreduce zookeeper
mapreduce zookeeper
hello world
world
ywx@suse114115:~/spark-2.1.1-bin-hadoop2.6/bin> ./spark-shell
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/03 16:40:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
17/07/03 16:40:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://ip:4040
Spark context available as ‘sc’ (master = local, app id = local-1499071205203).
Spark session available as ‘spark’.
Welcome to
/ / ___ / /
\ / _ / _ `/ __/ '/
// .__/_,// //_\ version 2.1.1
//
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.textFile("/tmp/ywx/test.txt").flatMap(line=>line.split(" ")).map(w=>(w,1)).reduceByKey(+).foreach(println)
(spark,1)
(hadoop,1)
(hive,1)
(mapreduce,2)
(zookeeper,2)
(hello,2)
(shell,1)
(world,3)
spark-submit提交任务
ywx@suse114115:~/spark-2.1.1-bin-hadoop2.6/bin> ./spark-submit --master local /home/ywx/wordcount.jar --class org.apache.spark.examples.JavaWordCount /tmp/ywx/output.txt
…
…
17/07/03 15:30:46 INFO DAGScheduler: ResultStage 1 (collect at JavaWordCount.java:78) finished in 0.047 s
17/07/03 15:30:46 INFO DAGScheduler: Job 0 finished: collect at JavaWordCount.java:78, took 0.718062 s
hive: 1
mapreduce: 2
zookeeper: 2
hello: 2
shell: 1
world: 3
spark: 1
hadoop: 1
17/07/03 15:30:46 INFO SparkUI: Stopped Spark web UI at http://ip:4040
17/07/03 15:30:46 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/03 15:30:46 INFO MemoryStore: MemoryStore cleared
17/07/03 15:30:46 INFO BlockManager: BlockManager stopped
17/07/03 15:30:46 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/03 15:30:46 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/07/03 15:30:46 INFO SparkContext: Successfully stopped SparkContext
…
wordcount.jar
java文件主要内容:
Main-Class: com.yxl.sparkcontext.JavaWordCount
SparkConf conf = new SparkConf().setMaster(“local[2]”).setAppName(“my app”);
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.textFile("/tmp/ywx/test.txt");
传递给Spark的master url可以是以下任意格式之一:
master URL 意义
local 使用1个worker线程本地运行Spark(即完全没有并行化)
local[K] 使用K个worker线程本地运行Spark(最好将K设置为机器的CPU核数)
local 根据机器的CPU逻辑核数,尽可能多地使用Worker线程
spark://HOST:PORT 连接到给定的Spark Standalone集群的Master,此端口必须是Master配置的端口,默认为7077
mesos://HOST:PORT 连接到给定的Mesos集群的Master,此端口必须是Master配置的端口,默认为5050。若Mesos集群 使用ZooKeeper,则master URL使用mesos://zk://……
yarn-client 以client模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
yarn-cluster 以cluster模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得