当前位置: 首页 > 知识库问答 >
问题:

Flink将SingleOutputStreamOP写入两个文件而不是一个

施俊哲
2023-03-14

我正在为工作中的一个项目尝试闪烁。我已经到了通过应用计数窗口等来处理流的地步。然而,我注意到一个特殊的行为,我无法解释。

看起来一个流是由两个线程处理的,输出也是分成两部分的。

首先,我注意到使用stream.print()将流打印到标准控制台时的行为。

然后,我打印到一个文件,它实际上正在输出文件夹中的两个名为1和2的文件中打印。

    SingleOutputStreamOperator<Tuple3<String, String,String>> c = stream_with_no_err.countWindow(4).apply(new CountPerWindowFunction());
    // c.print()   // this olso prints two streams in the standard console

    c.writeAsCsv("output");

有人能解释一下为什么Flink会有这种行为吗?如何配置它?为什么有必要对结果流进行拆分?

并行性我理解为对速度(多线程)有用,但为什么要拆分结果流?

通常,我希望将结果流(处理后)作为单个文件或tcp流等。手动组合两个文件并生成单个输出是正常的工作流吗?

谢谢

共有1个答案

尉迟宇定
2023-03-14

Flink是一种分布式并行流处理器。正如您所说的,并行化对于实现高吞吐量是必要的。应用程序的吞吐量受其最慢的操作符的限制。因此,在许多情况下,水槽也需要并联。

话虽如此,将接收器的并行度降低到1是非常简单的:c.writeAsCsv("输出"). setParallelism(1);

现在,接收器将作为单个线程运行,并且只生成单个文件。

 类似资料:
  • 我使用的是log4j2。在两个应用程序中使用xml,并且两者都可以登录相同的信息。日志应用程序A和B中提到的日志文件大小为100KB。当信息。日志信息超过100KB。日志创建1个备份文件。当应用程序B尝试登录时,日志会被写入信息中。日志1而不是信息。日志当我重新启动应用程序B时,日志将获得登录信息。日志为什么会观察到这种奇怪的日志记录方式?解决方法是什么。下面是log4j2。应用程序B的xml

  • 问题内容: 我有一个带有汉字文本的文件,我想将这些文本复制到另一个文件中。但是文件输出混乱了中文字符。注意,在我的代码中,我已经在使用“ UTF8”作为编码了: 问题答案: 在这种情况下,请勿使用FileReader,因为它不允许您指定输入编码。在FileInputStream上构造一个InputStreamReader。 像这样:

  • 我有一个,其类型为和 值实际上以格式保存数据,而分组键的格式为 示例:如果我的分组rdd中有以下键 那么在我的HDFS中我应该有三个文件 为此,我尝试了以下方法:

  • 我有一个长长的列表,列表的形式如下--- i、 列表中的值有不同的类型——float、int、string。如何将其写入csv文件,使输出的csv文件看起来像

  • 我已经设置了这组重写(Zend Framework默认重写规则): 我的文件夹只包含和,没有其他内容。 重写规则应通过index.php将所有请求发送到不存在的文件。这适用于(无论是否存在控制器),但不适用于或以开头的任何其他url。 所有URL都被路由到apache的404页面,所有其他URL都可以工作。 我该如何解决这个问题?

  • 问题内容: 我正在尝试将2个WAV文件混合到一个WAV文件中。文件将始终具有完全相同的持续时间,并具有相同的格式(16位,44.1 kHz,带符号,小字节序,单声道)。使用AudioSystem.getAudioInputSream的ByteArrayOutputStream将两个WAV放入字节数组,以确保我只是获取PCM数据而没有标题。 在其他几个线程的帮助下,我已经能够成功地组合阵列,但又不会