当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink中为每个输入生成输出文件

蒋畅
2023-03-14

我正在使用Flink来处理我的流数据。

流媒体来自其他一些中间件,如Kafka、Pravega等。

说Pravega正在发送一些文字流,<代码>你好,世界,我的名字是

我需要的是三个过程步骤:

  1. 将每个单词映射到我的自定义类对象MyJson
  2. 将对象MyJson映射到String。
  3. 将字符串写入文件:一个字符串写入一个文件。

例如,对于流hello world my name is,我应该得到五个文件。

这是我的代码:

// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
        FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(adapter)
                .build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
            .map(new MapFunction<String, MyJson>() {
                @Override
                public MyJson map(String s) throws Exception {
                    MyJson myJson = JSON.parseObject(s, MyJson.class);
                    return myJson;
                }
            });
// map MyJson to String
DataStream<String> valueInJson = jsonStream
            .map(new MapFunction<MyJson, String>() {
                @Override
                public String map(MyJson myJson) throws Exception {
                    return myJson.toString();
                }
            });
// output
valueInJson.print();

此代码将所有结果输出到Flink日志文件。

我的问题是如何将一个单词写入一个输出文件?

共有1个答案

景俊语
2023-03-14

我认为最简单的方法是使用定制水槽。

stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {

    @Override
    public void invoke(String value, Context context) {
        // generate a unique name for the new file and open it
        // write the word to the file
        // close the file
    }
}

请注意,此实现不一定只提供一次行为。您可能希望注意文件命名方案是唯一的和确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。

 类似资料:
  • 问题内容: 我是Hadoop的新手,正在尝试弄清楚它是如何工作的。至于练习,我应该实现类似于WordCount- Example的东西。任务是读入多个文件,执行WordCount并为每个输入文件编写一个输出文件。Hadoop使用组合器,将map- part的输出改编为reducer的输入,然后写入一个输出文件(我猜每个正在运行的实例)。我想知道是否可以为每个输入文件写入一个输出文件(因此保留inp

  • 问题内容: 我正在使用命令来获取目录中的文件名,但输出在一行中。 像这样: 我需要一个内置的替代方法来获取文件名,每个文件名都换行,如下所示: 问题答案: 使用该选项(请注意,这是一个“一个”数字,而不是小写字母“ L”),如下所示: 不过,首先请确保您的支持。GNU coreutils(安装在标准Linux系统上)和Solaris一样;但如果有疑问,请使用或或查看文档。例如:

  • 文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;

  • 我目前正在用Java编写一个程序,其中我需要从用户那里获得输入,这是一段文本。然而,我需要用户能够在一次输入文本的多个段落。当程序提示输入时,只需粘贴整段文本就可以实现这一点。我对输入使用扫描器,当我将多个段落粘贴到输入中时,无论何时打印存储文本的变量,都不会抛出错误,只输出第一节(在第一个换行符之前)。如何使用多个换行符存储整段文本,而不提示用户为每个文本块输入单独的输入? 我已经有了一些代码,

  • 目录表 文件 使用文件 储存器 储存与取储存 概括 在很多时候,你会想要让你的程序与用户(可能是你自己)交互。你会从用户那里得到输入,然后打印一些结果。我们可以分别使用raw_input和print语句来完成这些功能。对于输出,你也可以使用多种多样的str(字符串)类。例如,你能够使用rjust方法来得到一个按一定宽度右对齐的字符串。利用help(str)获得更多详情。 另一个常用的输入/输出类型