当前位置: 首页 > 知识库问答 >
问题:

如何在Hadoop 3.0中进行CopyMerge?

杨甫
2023-03-14

我知道hadoop 2.7版的FileUtil具有copyMerge功能,可以将多个文件合并到一个新文件中。

但是,3.0版的API不再支持copyMerge功能

在hadoop的3.0版本中,如何将目录中的所有文件合并到一个新的单个文件中,有什么想法吗?

共有3个答案

隗昀
2023-03-14

我也有同样的问题,必须重新实现copyMerge(虽然是在PySpark中,但使用与原始copyMerge相同的API调用)。

不知道为什么Hadoop 3中没有等效的功能。我们必须经常将文件从HDFS目录合并到HDFS文件。

这是我在上面提到的pySpark中的实现https://github.com/Tagar/stuff/blob/master/copyMerge.py

柳英豪
2023-03-14

FileUtil#copyMerge方法已删除。主要变更详见:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

可以使用getmerge

用法:hadoop fs-getmerge[-nl]

将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件中。(可选)-nl可以设置为允许在每个文件的末尾添加换行符(LF)-“跳过空文件”可用于避免出现空文件时出现不需要的换行符。

示例:

hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt

退出代码:成功时返回0,错误时返回非零。

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

翟凯
2023-03-14

自FileUtil。copyMerge()已被弃用并从版本3开始的API中删除,我们始终可以自己重新实现它。

这是以前版本的原始Java实现。

以下是Scala翻译:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException

def copyMerge(
    srcFS: FileSystem, srcDir: Path,
    dstFS: FileSystem, dstFile: Path,
    deleteSource: Boolean, conf: Configuration
): Boolean = {

  if (dstFS.exists(dstFile)) {
    throw new IOException(s"Target $dstFile already exists")
  }

  // Source path is expected to be a directory:
  if (srcFS.getFileStatus(srcDir).isDirectory) {

    val outputFile = dstFS.create(dstFile)
    try {
      srcFS
        .listStatus(srcDir)
        .sortBy(_.getPath.getName)
        .collect {
          case status if status.isFile =>
            val inputFile = srcFS.open(status.getPath)
            try { IOUtils.copyBytes(inputFile, outputFile, conf, false) }
            finally { inputFile.close() }
        }
    } finally { outputFile.close() }

    if (deleteSource) srcFS.delete(srcDir, true) else true
  }
  else false
}
 类似资料:
  • 问题内容: 在Python中scp文件的最pythonic方式是什么?我知道的唯一路线是 这是一种骇客,并且在类似Linux的系统之外不起作用,并且需要Pexpect模块的帮助来避免出现密码提示,除非你已经为远程主机设置了无密码的SSH。 我知道Twisted的,但是我希望避免通过低级ssh模块自己实现scp。 我知道,一个支持SSH和SFTP的Python模块;但它不支持SCP。 背景:我正在连

  • 问题内容: 我有一长行代码,我想在多行中分解。我使用什么,语法是什么? 例如,添加一串字符串, 并分成两行,如下所示: 问题答案: 线路是什么?你可以在下一行中使用参数而不出现任何问题: 否则,你可以执行以下操作: 查看样式指南以获取更多信息。 从示例行中: 要么: 请注意,样式指南指出,最好使用带括号的隐式连续符,但是在这种特殊情况下,仅在表达式周围加上括号可能是错误的方法。

  • 问题内容: 如何在jDBI中执行类似的操作? 表: foo(id int,name varchar) 与myBatis中的@SelectProvider相似。 问题答案: 这应该工作: 不要忘记用以下方法注释包含此方法的类: 注解(因为JDBI底层使用Apache StringTemplate进行此类替换)。还要注意,使用此注释,您不能在SQL查询中使用’<’字符而不进行转义(因为它是String

  • 问题内容: 我试图找到一种解决方案,如何从具有相同值的中获取所有字段。 例如 所以,我要的是做这样的事情。预期结果将是和。如果有人可以指出我如何使用本机命令或使用该命令,那将非常棒。 谢谢。 问题答案: 你可以这样做 在名为script.lua的lua脚本中 lua通过序列key0,val0,key1,val1等获取哈希值… 然后您可以这样称呼它: 您将拥有 : 有关eval函数的更多信息,请参见

  • 我正在尝试用Java做一些事情,我需要在while循环中等待/延迟几秒钟。 我想构建一个step sequencer,我是Java新手。有什么建议吗?

  • 问题内容: 我有一个内存中大约有1000个项目的数据集,正在尝试为此数据集创建一个传呼机,但是我不确定如何执行此操作。 我使用的是自定义过滤器功能来过滤结果,效果很好,但是以某种方式我需要获取页面数。 有什么线索吗? 问题答案: 查看UI Bootstrap的分页指令。我最终使用了它,而不是使用此处发布的内容,因为它具有当前使用的足够功能,并且具有详尽的测试规范。 视图 控制者 我做了一个工作的小