修改~/.bashrc文件,增加
# qfs with spark export SPARK_CLASSPATH=/your-path/qfs/lib/hadoop-2.5.1-qfs-master.jar:/your-path/qfs/lib/qfs-access-master.jar export LD_LIBRARY_PATH=/your-path/qfs/lib/
上面的设置虽然可以工作,但是启动spark-shell的时候会收到警告
16/10/17 21:37:38 WARN SparkConf: SPARK_CLASSPATH was detected (set to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 16/10/17 21:37:38 WARN SparkConf: Setting 'spark.executor.extraClassPath' to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar' as a work-around. 16/10/17 21:37:38 WARN SparkConf: Setting 'spark.driver.extraClassPath' to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar' as a work-around.
同时执行spark sql查询qfs的csv文件时会遇到这些警告
ERROR TaskSchedulerImpl: Lost executor 2 on ip: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 16/10/17 21:35:09 WARN TaskSetManager: Lost task 20.0 in stage 4.0 (TID 92, ip): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 16/10/17 21:35:09 WARN TaskSetManager: Lost task 38.0 in stage 4.0 (TID 110, ip): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
因此需要在spark-defaults.conf文件中添加两行,删除~/.bashrc中的SPARK_CLASSPATH
spark.executor.extraClassPath /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar spark.driver.extraClassPath /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar
退出再次进入qfs帐号,重启spark slave和shell. 以上警告都消失了。
需要在spark-defaults.conf文件中添加一行
spark.sql.hive.metastore.sharedPrefixes com.quantcast.qfs
~/.bashrc文件中,如下设置
export MASTER=spark://meta-server-ip:7077 # qfs with spark export LD_LIBRARY_PATH=/home/qfs/qfs/lib/ # User specific aliases and functions export MASTER=spark://meta-server:7077 export PATH=/home/qfs/spark/bin:$PATH alias spark-shell-qfs="/home/qfs/spark/bin/spark-shell -i /home/qfs/init.qfs"
init.qfs文件内容如下:
sc.hadoopConfiguration.set("fs.qfs.impl", "com.quantcast.qfs.hadoop.QuantcastFileSystem"); sc.hadoopConfiguration.set("fs.defaultFS", "qfs://metaserver-ip:20000"); sc.hadoopConfiguration.set("fs.qfs.metaServerHost", "metaserver-ip"); sc.hadoopConfiguration.set("fs.qfs.metaServerPort", "20000");
以后每次只需要运行 spark-shell-qfs启动,就可以实现自动加载qfs的设置了。
scala> val file = sc.textFile("/sdd/files/sample") file: org.apache.spark.rdd.RDD[String] = /sdd/files/sample MapPartitionsRDD[1] at textFile at <console>:24 scala> file.count() res4: Long = 93676
注意,这里文件路径没有/qfs/前缀
这时会出现没有/tmp/hive写权限,这件事情很有趣,一开始,/tmp/hive目录在qfs上是不存在的,当在spark-shell中运行了这行代码后,目录被自动创建,但是马上报无法写入的错误:
val df = spark.read.option("delimiter","\t").csv("/data/files/gsm*"); java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx-wx--x
解决方法是进入qfsshell, 用chmod命令
use chmod 0777 hive
注意,不能用777, 必须是0777, 再看权限,变成了
QfsShell> ls -alh hive/ <dir> rwxrwxrwx qfs qfs 0 Oct 17 10:33
读取csv文件的时候,由于字段数目较多,超过了默认的字段数目25, 会提示警告,可以通过在spark-defaults.conf文件中添加一行处理
spark.debug.maxToStringFields 1000
重启slave,再此进入spark-shell, warning消失了。
val df = spark.read.option("delimiter","\t").csv("/data/files/gsm*"); df.write.parquet("/data/pfiles/voice1.parquet");
写入parquet的时候报错,一堆信息,推测是内存不足
ERROR TaskSchedulerImpl: Lost executor 2 on ip: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
根据经验推断是文件太大,默认spark shell的1g内存不够, 解决方法是在~/.bashrc中增加spark-shell的内存, 并不需要设置spark-slave的启动内存大小。
alias spark-shell-qfs="/home/qfs/spark/bin/spark-shell --executor-memory 30G -i /home/qfs/init.qfs"
执行成功后,在qfsshell中可以看到生成了目录和文件
QfsShell> cd voice1.parquet QfsShell> ls _SUCCESS part-r-00000-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00001-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00002-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00003-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00004-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00005-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00006-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00007-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00008-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00009-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00010-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00011-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00012-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00013-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00014-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00015-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00016-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00017-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00018-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00019-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00020-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00021-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00022-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00023-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet part-r-00024-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
现在来看下内存设置影响了什么。
$ ps -def | grep java qfs 14830 1 0 Oct17 ? 00:00:59 /usr/local/java/bin/java -cp /home/qfs/spark/conf/:/home/qfs/spark/jars/* -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://:7077 qfs 17895 17888 6 14:48 pts/1 00:00:55 /usr/local/java/bin/java -cp /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar:/home/qfs/spark/conf/:/home/qfs/spark/jars/* -Dscala.usejavacp=true -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell --executor-memory 30G spark-shell -i /home/qfs/init.qfs qfs 18043 14830 60 14:48 ? 00:08:39 /usr/local/java/bin/java -cp /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar:/home/qfs/spark/conf/:/home/qfs/spark/jars/* -Xmx30720M -Dspark.driver.port=13742 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@:13742 --executor-id 0 --hostname --cores 24 --app-id app-20161018144823-0018 --worker-url spark://Worker@:52903
可以看到没有改变spark slave和spark shell本身的JVM内存设置,–executoir-memory参数30G的值传递给了新进程18043, 推测是spark shell通过submit传递给了spark slave, spark slave启动了新进程,作为-Xmx参数的值使用。
有了更好的parquet文件,现在在上面查询吧,体会下加速的快感。
val df2 = spark.read.parquet("/data/pfiles/voice1.parquet");
经过验证,和csv数据一致。