我正在尝试使用Scala编写一个HDFS输出文件,收到以下错误:
线程“main”org.apache.spark.sparkException中的异常:任务不可序列化,位于org.apache.spark.util.closurecleaner$.ensureclealizable(closurecleaner.scala:315)位于org.apache.spark.util.closurecleaner$.org$apache.spark.util.closurecleaner$$clean(Closurecleaner.scala:305)位于org.apache.spark.util.closurecleaner$.clean(Closurecleaner.scala:132),位于
所有第23行我需要在输出文件中写一行。
代码源:
package com.mycode.logs;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;
/**
* @author RondenaR
*
*/
object NormalizeMSLogs{
def main(args: Array[String]){
processMsLogs("/user/temporary/*file*")
}
def processMsLogs(path: String){
System.out.println("INFO: ****************** started ******************")
// **** SetMaster is Local only to test *****
// Set context
val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
// Set HDFS
System.setProperty("HADOOP_USER_NAME", "hdfs")
val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
val hdfs = FileSystem.get(hdfsconf)
val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
val writer = new PrintWriter(output)
val sourcePath = new Path(path)
var count :Int = 0
var lineF :String = ""
hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
val filePathName = fileStatus.getPath().toString()
val fileName = fileStatus.getPath().getName()
val hdfsfileIn = sc.textFile(filePathName)
val msNode = fileName.substring(1, fileName.indexOf("es"))
System.out.println("filePathName: " + filePathName)
System.out.println("fileName: " + fileName)
System.out.println("hdfsfileIn: " + filePathName)
System.out.println("msNode: " + msNode)
for(line <- hdfsfileIn){
//System.out.println("line = " + line)
count += 1
if(count != 23){
lineF = lineF + line + ", "
}
if(count == 23){
lineF = lineF + line + ", " + msNode
System.out.println(lineF)
writer.write(lineF)
writer.write("\n")
count = 0
lineF = ""
}
} // end for loop in file
} // end foreach loop
writer.close()
System.out.println("INFO: ******************ended ******************")
sc.stop()
}
}
不仅printwriter
对象writer
不可序列化:而且您不能将sparkcontext
(sc
)放入foreach中:它是一个仅用于驱动程序的构造,通过线发送给工作者没有意义。
您应该花一些时间来考虑什么类型的对象可以通过有线发送。任何指针/流/句柄都没有意义。结构、字符串、原语:在闭包(或广播)中包含这些是有意义的。
问题内容: 是否有人尝试 将 log4j 日志文件_直接 _写入 Hadoop分布式文件系统 ? 如果是,请回答如何实现。我想我必须为此创建一个Appender。 是这样吗 我需要以特定的时间间隔将日志写入文件,并在以后的阶段查询该数据。 问题答案: 我建议将Apache Flume 用于此任务。Log4j有Flume附加程序。这样,您将日志发送到Flume,并写入HDFS。这种方法的好处是Flu
我试图用C编写一个接口,用libhdfs在hdfs中编写文件。所以我的目标hadoop版本是2.0.2-alpha 以下代码 运行时出现以下错误: 相应地更新了CLASSPATH,但仍然收到此错误。知道我这里缺少的步骤是什么吗?所有. so文件都正确链接。
是否有一种已知的方法使用Hadoop api/spark scala在Hdfs上将文件从一个目录复制到另一个目录? 我尝试使用copyFromLocalFile,但没有帮助
本文向大家介绍如何使用R编写文本并将其输出为文本文件?,包括了如何使用R编写文本并将其输出为文本文件?的使用技巧和注意事项,需要的朋友参考一下 我们可以使用writeLines和fileConn函数来做到这一点。 示例 我们可以这样做,并按照以下方式在R中查看这些文件- 您可以在系统的文件文件夹中找到这些文件。
问题内容: 我想在HDFS中创建文件并在其中写入数据。我使用以下代码: 它创建文件,但不写入任何内容。我搜索了很多,但没有找到任何东西。我怎么了 我是否需要任何权限才能在HDFS中写入? 问题答案: 的替代方法,你可以在获取文件系统时传递URI
我正在使用Flume从我的本地文件系统写一些CSV文件到HDFS。 我想知道HDFS水槽的最佳配置是什么,这样本地系统上的每个文件都会在HDFS以CSV格式准确复制。我希望Flume处理的每个CSV文件都是单个事件,作为单个文件刷新和写入。尽可能多地,我希望文件是完全一样的,没有标题的东西等。 我需要在这些值上加什么来模拟我想要的行为? 如果还有其他Flume代理配置变量需要更改,请提供。 如果这