当前位置: 首页 > 知识库问答 >
问题:

Flink可以将结果写入多个文件(如Hadoop的MultipleOutputFormat)吗?

郑旭
2023-03-14

我正在使用ApacheFlink的数据集API。我想实现一个将多个结果写入不同文件的作业。

我该怎么做?

共有2个答案

通典
2023-03-14

您可以在Flink中使用HadoopOutputFormatAPI,如下所示:

class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] {
override def generateActualKey(key: K, value: V): K =
  NullWritable.get().asInstanceOf[K]

override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
  key.asInstanceOf[String]
}

我们可以使用IteblogMultipleTextOutputFormat如下所示:

val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
val jc = new JobConf()
FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat,   jc)
val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"),
  ("B", "1"), ("B", "2"), ("C", "1"), ("D", "2")))
batch.output(format)

有关更多信息,请参见:http://www.iteblog.com/archives/1667

拓拔泓
2023-03-14

您可以根据需要向数据集程序添加任意数量的数据接收器。

例如,在这样的程序中:

java prettyprint-override">ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<String, Long, Long>> data = env.readFromCsv(...);
// apply MapFunction and emit
data.map(new YourMapper()).writeToText("/foo/bar");
// apply FilterFunction and emit
data.filter(new YourFilter()).writeToCsv("/foo/bar2");

从CSV文件中读取数据集数据。此数据将用于两个后续转换:

  1. 到一个MapFunction,其结果被写入一个文本文件。
  2. FilterFunction,未过滤的元组被写入CSV文件。

您还可以拥有多个数据源以及分支和合并数据集(使用unionjoincoGroupcross或广播集),如您所愿。

 类似资料:
  • 我是一个有点新的地图缩小,所以如果任何人可以指导我与下面的问题,这将是伟大的 > 我在map Reduce中使用了多输出格式来写入分离输出文件。让我们假设我的输入文件有水果和蔬菜,因此把它分成两个文件。水果和蔬菜如下。 水果-R-00000,蔬菜-R-00000,部分-R-00000 我搞不清有多少减速器会运转?我知道,默认情况下,减速器的数量被设置为1,由于文件名的数字部分是相同的,我相信只有一

  • 问题内容: 我是Hadoop的新手。我正在尝试Wordcount程序。 现在尝试使用多个输出文件。这个链接帮助我做到了。http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html 在我的司机课上 而我的降低班级变成了 一切正常,但是我得到了很多文件(对于每个ma

  • 问题内容: 如果您曾经使用过p2p下载软件,则他们可以使用多线程下载文件,并且只能创建一个文件,所以我想知道线程如何将数据写入该文件。顺序还是并行? 想象一下,您想将大数据库表转储到文件中,以及如何使这项工作更快? 问题答案: 您可以使用多个线程将a写入文件(例如日志文件)。但是您必须协调@Thilo指出的线程。您需要同步文件访问并仅写入整个记录/行,或者需要制定一种策略来将文件的区域分配给不同的

  • 我正在为工作中的一个项目尝试闪烁。我已经到了通过应用计数窗口等来处理流的地步。然而,我注意到一个特殊的行为,我无法解释。 看起来一个流是由两个线程处理的,输出也是分成两部分的。 首先,我注意到使用将流打印到标准控制台时的行为。 然后,我打印到一个文件,它实际上正在输出文件夹中的两个名为1和2的文件中打印。 有人能解释一下为什么Flink会有这种行为吗?如何配置它?为什么有必要对结果流进行拆分? 并

  • 问题内容: 我有一个问题-如何将结果/数据库选择写入JTextArea。我的JButton的方法是: 如果使用 -在控制台中看到输出,但是如何将Text设置为JTextArea? 问题答案: 请参阅文档。

  • 我有一些读物: ATCAAGTCCGTAGTACGGGAATGCAAAAAAAA GGGCTAGTAGGATTGGCCTAGTCACTAGGGGGG TAGCTAGGTAGGATGCCTAGTCAGCGG ... ... 现在,我想将每个读取和写入文件左侧的12个碱基剪切: 我想知道如何编写xlsx文件