当前位置: 首页 > 工具软件 > QFS > 使用案例 >

spark访问qfs

韩英锐
2023-12-01

1 设置java库和c++库的路径

1.1 老设置方法

修改~/.bashrc文件,增加

# qfs with spark                              
export SPARK_CLASSPATH=/your-path/qfs/lib/hadoop-2.5.1-qfs-master.jar:/your-path/qfs/lib/qfs-access-master.jar
export LD_LIBRARY_PATH=/your-path/qfs/lib/

1.2 继续改进

上面的设置虽然可以工作,但是启动spark-shell的时候会收到警告

16/10/17 21:37:38 WARN SparkConf: 
SPARK_CLASSPATH was detected (set to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar').
This is deprecated in Spark 1.0+.

Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath

16/10/17 21:37:38 WARN SparkConf: Setting 'spark.executor.extraClassPath' to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar' as a work-around.
16/10/17 21:37:38 WARN SparkConf: Setting 'spark.driver.extraClassPath' to '/home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar' as a work-around.

同时执行spark sql查询qfs的csv文件时会遇到这些警告

ERROR TaskSchedulerImpl: Lost executor 2 on ip: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/10/17 21:35:09 WARN TaskSetManager: Lost task 20.0 in stage 4.0 (TID 92, ip): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/10/17 21:35:09 WARN TaskSetManager: Lost task 38.0 in stage 4.0 (TID 110, ip): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

因此需要在spark-defaults.conf文件中添加两行,删除~/.bashrc中的SPARK_CLASSPATH

spark.executor.extraClassPath /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar
spark.driver.extraClassPath /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar

退出再次进入qfs帐号,重启spark slave和shell. 以上警告都消失了。

2 防止重复加载动态库

需要在spark-defaults.conf文件中添加一行

spark.sql.hive.metastore.sharedPrefixes com.quantcast.qfs

3 简化spark-shell启动

3.1 设置bashrc

~/.bashrc文件中,如下设置

export MASTER=spark://meta-server-ip:7077
# qfs with spark
export LD_LIBRARY_PATH=/home/qfs/qfs/lib/

# User specific aliases and functions
export MASTER=spark://meta-server:7077
export PATH=/home/qfs/spark/bin:$PATH
alias spark-shell-qfs="/home/qfs/spark/bin/spark-shell -i /home/qfs/init.qfs"

3.2 创建init.qfs文件

init.qfs文件内容如下:

sc.hadoopConfiguration.set("fs.qfs.impl", "com.quantcast.qfs.hadoop.QuantcastFileSystem");
sc.hadoopConfiguration.set("fs.defaultFS", "qfs://metaserver-ip:20000");
sc.hadoopConfiguration.set("fs.qfs.metaServerHost", "metaserver-ip");
sc.hadoopConfiguration.set("fs.qfs.metaServerPort", "20000");

3.3 启动命令

以后每次只需要运行 spark-shell-qfs启动,就可以实现自动加载qfs的设置了。

4 读取qfs里的文本文件

scala> val file = sc.textFile("/sdd/files/sample")
file: org.apache.spark.rdd.RDD[String] = /sdd/files/sample MapPartitionsRDD[1] at textFile at <console>:24
scala> file.count()
res4: Long = 93676

注意,这里文件路径没有/qfs/前缀

5 读取qfs里的csv文件

5.1 /tmp/hive写权限问题

这时会出现没有/tmp/hive写权限,这件事情很有趣,一开始,/tmp/hive目录在qfs上是不存在的,当在spark-shell中运行了这行代码后,目录被自动创建,但是马上报无法写入的错误:

val df = spark.read.option("delimiter","\t").csv("/data/files/gsm*");
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx-wx--x

解决方法是进入qfsshell, 用chmod命令

use chmod 0777 hive

注意,不能用777, 必须是0777, 再看权限,变成了

QfsShell> ls -alh
hive/	<dir>	rwxrwxrwx	qfs	qfs	    0	Oct 17 10:33

5.2 去除spark.debug.maxToStringFields相关的warning

读取csv文件的时候,由于字段数目较多,超过了默认的字段数目25, 会提示警告,可以通过在spark-defaults.conf文件中添加一行处理

spark.debug.maxToStringFields 1000

重启slave,再此进入spark-shell, warning消失了。

6 将csv文件转换成parquet文件

val df = spark.read.option("delimiter","\t").csv("/data/files/gsm*");
df.write.parquet("/data/pfiles/voice1.parquet");

写入parquet的时候报错,一堆信息,推测是内存不足

ERROR TaskSchedulerImpl: Lost executor 2 on ip: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

根据经验推断是文件太大,默认spark shell的1g内存不够, 解决方法是在~/.bashrc中增加spark-shell的内存, 并不需要设置spark-slave的启动内存大小。

alias spark-shell-qfs="/home/qfs/spark/bin/spark-shell --executor-memory 30G -i /home/qfs/init.qfs"

执行成功后,在qfsshell中可以看到生成了目录和文件

QfsShell> cd voice1.parquet
QfsShell> ls
_SUCCESS
part-r-00000-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00001-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00002-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00003-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00004-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00005-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00006-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00007-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00008-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00009-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00010-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00011-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00012-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00013-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00014-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00015-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00016-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00017-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00018-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00019-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00020-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00021-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00022-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00023-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet
part-r-00024-29e32eb2-4db5-426b-bf15-16247c18d3c5.snappy.parquet

现在来看下内存设置影响了什么。

$ ps -def | grep java
qfs      14830     1  0 Oct17 ?        00:00:59 /usr/local/java/bin/java -cp /home/qfs/spark/conf/:/home/qfs/spark/jars/* -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://:7077
qfs      17895 17888  6 14:48 pts/1    00:00:55 /usr/local/java/bin/java -cp /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar:/home/qfs/spark/conf/:/home/qfs/spark/jars/* -Dscala.usejavacp=true -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell --executor-memory 30G spark-shell -i /home/qfs/init.qfs
qfs      18043 14830 60 14:48 ?        00:08:39 /usr/local/java/bin/java -cp /home/qfs/qfs/lib/hadoop-2.5.1-qfs-master.jar:/letv/qfs/lib/qfs-access-master.jar:/home/qfs/spark/conf/:/home/qfs/spark/jars/* -Xmx30720M -Dspark.driver.port=13742 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@:13742 --executor-id 0 --hostname  --cores 24 --app-id app-20161018144823-0018 --worker-url spark://Worker@:52903

可以看到没有改变spark slave和spark shell本身的JVM内存设置,–executoir-memory参数30G的值传递给了新进程18043, 推测是spark shell通过submit传递给了spark slave, spark slave启动了新进程,作为-Xmx参数的值使用。

7 查询parquet

有了更好的parquet文件,现在在上面查询吧,体会下加速的快感。

val df2 = spark.read.parquet("/data/pfiles/voice1.parquet");

经过验证,和csv数据一致。

 类似资料: