当前位置: 首页 > 知识库问答 >
问题:

如何从spark Executor中读取HDFS文件?

刁钧
2023-03-14

我有一个大的(>500M行)CSV文件。这个CSV文件中的每一行都包含一个位于HDFS上的二进制文件的路径。我想使用Spark读取这些文件中的每一个,处理它们,并将结果写到另一个CSV文件或表中。

在驱动程序中执行此操作非常简单,下面的代码完成了这项工作

val hdfsFilePathList = // read paths from CSV, collect into list

hdfsFilePathList.map( pathToHdfsFile => {
  sqlContext.sparkContext.binaryFiles(pathToHdfsFile).mapPartitions { 
    functionToProcessBinaryFiles(_)
  }
})
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration

class ConfigSerDeser(var conf: Configuration) extends Serializable {

  def this() {
    this(new Configuration())
  }

  def get(): Configuration = conf

  private def writeObject (out: java.io.ObjectOutputStream): Unit = {
    conf.write(out)
  }

  private def readObject (in: java.io.ObjectInputStream): Unit = {
    conf = new Configuration()
    conf.readFields(in)
  }

  private def readObjectNoData(): Unit = {
    conf = new Configuration()
  }
}

val serConf = new ConfigSerDeser(sc.hadoopConfiguration)

val mappedIn = inputDf.map( row => {
    serConf.get()
})

但是kryoException:java.util.concurrentModificationException失败

是否可以让执行者直接访问HDFS文件或HDFS文件系统?或者,是否有一种有效的方法来读取HDFS/S3上数百万个二进制文件并用Spark处理它们?

共有1个答案

苗学民
2023-03-14

有一个类似的用例,我试图做同样的事情,但意识到SparkSession或SparkContext是不可序列化的,因此不能从执行器访问。

 类似资料:
  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 问题内容: 这是我的问题:我在HDFS中有一个文件,该文件可能很大(=不足以容纳所有内存) 我想做的是避免必须将此文件缓存在内存中,而仅像逐行处理常规文件一样逐行处理它: 我正在寻找是否有一种简单的方法可以在不使用外部库的情况下正确完成此操作。我可能可以使它与libpyhdfs或python- hdfs一起使用, 但我想尽可能避免在系统中引入新的依赖项和未经测试的库,尤其是因为这两个似乎都没有得到

  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

  • 问题内容: 我知道如何读取字节,但是如何在Python中读取位? 我只需要从二进制文件中读取5位(而不是8位[1字节]) 有什么想法或方法吗? 问题答案: Python一次只能读取一个字节。您需要读完整的字节,然后从该字节中提取所需的值,例如 或者,如果您想要5个最低有效位,而不是5个最高有效位: 一些其他有用的位操作信息可以在这里找到:http : //wiki.python.org/moin/

  • 问题内容: 下面的Mappers代码从HDFS读取文本文件正确吗?如果是这样的话: 如果不同节点中的两个映射器尝试几乎同时打开文件,会发生什么情况? 是否不需要关闭?如果是这样,如何在不关闭文件系统的情况下执行此操作? 我的代码是: 问题答案: 这将起作用,并进行一些修改-我假设您粘贴的代码被截断了: 您可以有多个映射器读取同一个文件,但是使用分布式缓存存在更多的局限性(不仅减少了承载文件块的数据

  • 我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。