当前位置: 首页 > 知识库问答 >
问题:

用Scala编写HDFS输出文件

斜高翰
2023-03-14

我正在尝试使用Scala编写一个HDFS输出文件,收到以下错误:

线程“main”org.apache.spark.sparkException中的异常:任务不可序列化,位于org.apache.spark.util.closurecleaner$.ensureclealizable(closurecleaner.scala:315)位于org.apache.spark.util.closurecleaner$.org$apache.spark.util.closurecleaner$$clean(Closurecleaner.scala:305)位于org.apache.spark.util.closurecleaner$.clean(Closurecleaner.scala:132),位于

所有第23行我需要在输出文件中写一行。

代码源:

package com.mycode.logs;

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

/**
 * @author RondenaR
 * 
 */
object NormalizeMSLogs{

  def main(args: Array[String]){
    processMsLogs("/user/temporary/*file*")
  }

  def processMsLogs(path: String){
    System.out.println("INFO: ****************** started ******************")

    // **** SetMaster is Local only to test *****
    // Set context
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val hiveContext = new HiveContext(sc)

    // Set HDFS
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
    val hdfs = FileSystem.get(hdfsconf)

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
    val writer = new PrintWriter(output)

    val sourcePath = new Path(path)
    var count :Int = 0
    var lineF :String = ""

    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
      val filePathName = fileStatus.getPath().toString()
      val fileName = fileStatus.getPath().getName()

      val hdfsfileIn = sc.textFile(filePathName)
      val msNode = fileName.substring(1, fileName.indexOf("es"))

      System.out.println("filePathName: " + filePathName)
      System.out.println("fileName: " + fileName)
      System.out.println("hdfsfileIn: " + filePathName)
      System.out.println("msNode: " + msNode)

      for(line <- hdfsfileIn){
        //System.out.println("line = " + line)
        count += 1

        if(count != 23){
          lineF = lineF + line + ", "
        }

        if(count == 23){
          lineF = lineF + line + ", " + msNode
          System.out.println(lineF)
          writer.write(lineF) 
          writer.write("\n")
          count = 0
          lineF = ""
        }
      } // end for loop in file
    } // end foreach loop
    writer.close()
    System.out.println("INFO: ******************ended ******************")
    sc.stop()
  }
}

共有1个答案

李招
2023-03-14

不仅printwriter对象writer不可序列化:而且您不能将sparkcontext(sc)放入foreach中:它是一个仅用于驱动程序的构造,通过线发送给工作者没有意义。

您应该花一些时间来考虑什么类型的对象可以通过有线发送。任何指针/流/句柄都没有意义。结构、字符串、原语:在闭包(或广播)中包含这些是有意义的。

 类似资料:
  • 问题内容: 是否有人尝试 将 log4j 日志文件_直接 _写入 Hadoop分布式文件系统 ? 如果是,请回答如何实现。我想我必须为此创建一个Appender。 是这样吗 我需要以特定的时间间隔将日志写入文件,并在以后的阶段查询该数据。 问题答案: 我建议将Apache Flume 用于此任务。Log4j有Flume附加程序。这样,您将日志发送到Flume,并写入HDFS。这种方法的好处是Flu

  • 我试图用C编写一个接口,用libhdfs在hdfs中编写文件。所以我的目标hadoop版本是2.0.2-alpha 以下代码 运行时出现以下错误: 相应地更新了CLASSPATH,但仍然收到此错误。知道我这里缺少的步骤是什么吗?所有. so文件都正确链接。

  • 是否有一种已知的方法使用Hadoop api/spark scala在Hdfs上将文件从一个目录复制到另一个目录? 我尝试使用copyFromLocalFile,但没有帮助

  • 本文向大家介绍如何使用R编写文本并将其输出为文本文件?,包括了如何使用R编写文本并将其输出为文本文件?的使用技巧和注意事项,需要的朋友参考一下 我们可以使用writeLines和fileConn函数来做到这一点。 示例 我们可以这样做,并按照以下方式在R中查看这些文件- 您可以在系统的文件文件夹中找到这些文件。

  • 问题内容: 我想在HDFS中创建文件并在其中写入数据。我使用以下代码: 它创建文件,但不写入任何内容。我搜索了很多,但没有找到任何东西。我怎么了 我是否需要任何权限才能在HDFS中写入? 问题答案: 的替代方法,你可以在获取文件系统时传递URI

  • 我正在使用Flume从我的本地文件系统写一些CSV文件到HDFS。 我想知道HDFS水槽的最佳配置是什么,这样本地系统上的每个文件都会在HDFS以CSV格式准确复制。我希望Flume处理的每个CSV文件都是单个事件,作为单个文件刷新和写入。尽可能多地,我希望文件是完全一样的,没有标题的东西等。 我需要在这些值上加什么来模拟我想要的行为? 如果还有其他Flume代理配置变量需要更改,请提供。 如果这