当前位置: 首页 > 知识库问答 >
问题:

有没有办法用Apache Beam FileIO为每条记录编写一个文件?

吉泰宁
2023-03-14

我正在学习ApacheBeam,并试图实现类似于distcp的东西。我使用FileIO。read()。filepattern()获取输入文件,但在使用FileIO写入时。写下,文件有时会合并。

在作业执行之前不可能知道分区计数。

PCollection<MatchResult.Metadata> pCollection = pipeline.apply(this.name(), FileIO.match().filepattern(path()))
  .apply(FileIO.readMatches())
  .apply(name(), FileIO.<FileIO.ReadableFile>write()
        .via(FileSink.create())
        .to(path()));

水槽代码

@AutoValue
public abstract static class FileSink implements FileIO.Sink<FileIO.ReadableFile> {

    private OutputStream outputStream;

    public static FileSink create() {
      return new AutoValue_FileIOOperator_FileSink();
    }

    @Override
    public void open(WritableByteChannel channel) throws IOException {
      outputStream = Channels.newOutputStream(channel);
    }

    @Override
    public void write(FileIO.ReadableFile element) throws IOException {
      try (final InputStream inputStream = Channels.newInputStream(element.open())) {
        IOUtils.copy(inputStream, outputStream);
      }
    }

    @Override
    public void flush() throws IOException {
      outputStream.flush();
    }
  }

共有1个答案

龙霖
2023-03-14

你可以使用FileIO。writeDynamic并在中指定。通过你想如何编写它们。例如,如果你有唯一的密钥,你可以使用。通过(KV::getKey),每个关键元素将被写入一个单独的文件。否则,标准可以是行的散列,等等。您也可以调整。随意命名。作为演示:

p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
 .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to(output)
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

这将把四个元素写入四个文件:

$ mvn compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.OneRowOneFile \
      -Dexec.args="--project=$PROJECT \
      --output="output/" \
      --runner=DirectRunner"

$ ls output/
file-four-00001-of-00003.txt  file-one-00002-of-00003.txt  file-three-00002-of-00003.txt  file-two-00002-of-00003.txt
$ cat output/file-four-00001-of-00003.txt 
this is row 4

完整代码

package com.dataflow.samples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public abstract class OneRowOneFile {

    public interface Options extends PipelineOptions {
        @Validation.Required
        @Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
        String getOutput();
        void setOutput(String s);
    }

    public static void main(String[] args) {

        OneRowOneFile.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OneRowOneFile.Options.class);

        Pipeline p = Pipeline.create(options);

        String output = options.getOutput();

        p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
         .apply(FileIO.<String, KV<String, String>>writeDynamic()
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .to(output)
            .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

        p.run().waitUntilFinish();
    }
}

让我知道这是否也适用于您的定制水槽。

 类似资料:
  • 问题内容: 有没有办法编译一个node.js应用程序? 问题答案: 我可能已经很晚了,但是您可以使用“ nexe”模块在一个可执行文件中编译nodejs +您的脚本:https : //github.com/crcn/nexe

  • 我解释我这篇文章的目标。 实际上,我想重写如何Android工作的基本上,与应用程序中的字符串。此时此刻,我可以从重写android资源中动态更改它 实际上,我有这个功能,只有当我使用时才能正常工作: 有一种简单的方法,不需要太多代码,就可以覆盖xml中的行为?

  • 我想知道是否有一种通用方法可以在记录时屏蔽特定的类字段值?我正在使用龙目山记录器。 例如: 在记录toString()时,我希望能够以通用方式屏蔽该值(我知道我可以重写toString方法,但这需要针对每个类专门完成)

  • 问题内容: 在我的项目中,我们使用了许多批注,这些批注在javadoc API文档中非常有用。 有谁知道在生成的javadocs中包含注释的简单方法?我不想编写自己的javadoc插件。有什么解决办法吗? 问题答案: 参见java.lang.annotation.Documented 表示默认情况下,javadoc和类似工具将记录带有类型的注释。此类型应用于注释其注释会影响其客户端对已注释元素的使

  • 问题内容: 根据MDN 文档: 该 方法冻结对象:即,防止向其添加新属性;防止现有属性被删除;并防止更改现有属性或其可枚举性,可配置性或可写性。本质上,对象实际上是不可变的。该方法返回被冻结的对象。 我期望在某个日期调用冻结会阻止对该日期进行更改,但是它似乎没有用。这是我正在做的(运行Node.js v5.3.0): 我本以为调用会失败或什么都不做。任何想法如何冻结日期? 问题答案: 有没有办法O

  • 我使用new File()在内存中创建一个文件,然后我想在上面写,但不想在磁盘中创建文件。 我希望它不要在磁盘上创建文件。