当前位置: 首页 > 知识库问答 >
问题:

Apache Flink连续拆分奇怪的行为

蒋弘致
2023-03-14

我相信,当连续完成两个拆分时,Flink的行为很奇怪。我可能在我的实现逻辑中有一些错误,这就是为什么我在这里发帖征求您的意见。

最小示例:我有一个包含单词Apple、Banana和Orange的文本文件。我将其作为源在流执行环境中传递。我进行了第一次拆分,其中选择条件是参数是否为单词“Apple”。如果是,我将其放在“主题”Apples中,否则放在“主题”NotApples中。然后我在此拆分流中选择“主题”NotApples并再次拆分它,但这次条件检查参数是否为单词“Orange”。如果是,则将其放置在“主题”橙子中,否则放置在“主题”NotOrang中。

最后,当我打印最后一条分割流的主题NotOranges时,我所期望的是只打印“香蕉”一词。然而,我实际上打印的是“苹果”和“香蕉”两个词。我注意到,在进行第二次拆分时,处理的流不是仅包含我从中选择的主题元素(即NotApples)的流,而是包含所有元素的流。我错过什么了吗?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Apple")) {
            output.add("Apples");
        } else {
            output.add("NotApples");
        }
        return output;
    }
});


DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Orange")) {
            output.add("Oranges");
        } else {
            output.add("NotOranges");
        }
        return output;
    }
});

DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");

输出:

1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple

注意:我知道我可以用一个拆分来实现相同的逻辑(在这个逻辑中,我检查参数是“Apple”还是“Organge”)。然而,这不是我问题的重点。我最初在我编写的一个更复杂的程序中注意到这种行为,其中需要两个结果拆分,所以我决定尝试在一个最小的示例中重新创建它,以检查我是否可以重现它。

共有2个答案

陈翰林
2023-03-14

考虑到我对拆分/选择是如何实现的所知,如果这不起作用,我也不会感到惊讶(尽管我还不太清楚)。此外,拆分/选择最近已被弃用(尽管尚不清楚它是否真的会消失)。

进行拆分/选择的更好方法是通过侧面输出。这是一种更强大的机制,具有更清晰的实现。

夏侯朝斑
2023-03-14

最近在邮件列表上讨论了这种不正确的行为,主题是“关于为DataStream API弃用拆分/选择”。我认为关键评论是:

首先,我们必须承认当前拆分/选择的实现是有缺陷的。我粗略地浏览了源代码,问题可能是对于连续的选择/拆分,前一个将在StreamGraph生成阶段被后一个覆盖。这就是为什么我们在FLINK-11084中禁止这种连续逻辑。

在查看了FLINK-11084和由此产生的补丁之后,我相信如果您连续进行两次拆分/选择,最近发布的Flink将引发异常。

 类似资料:
  • 问题内容: 我有一个小文件,其中包含一些我想用“ |”分割的内容 字符。 当我尝试使用其他任何字符(例如“>”)时,它都可以正常工作,但是使用“ |” 性格,有一些意想不到的结果。 行本身(此处带有 >字符) addere> to add>(1) 分割“ >”结果 [加法,加法(1)] 分割“ |” 结果 [,a,d,d,e,r,e,|,t,o,,a,d,d,|,(,1,)] 为什么要拆分所有内容

  • 我有以下代码来解析一个JSON文件: 要处理以下JSON文件: 如果我执行此代码,我将收到以下错误: 所以我开始一步一步地调试应用程序,看看part processing()中的哪个代码部分抛出了这个异常。令人惊讶的是,那里的所有代码都正常执行:没有抛出异常,也没有返回结果I except。 更让我惊讶的是,当我稍微改变第一种方法的代码时,它可以在不产生异常的情况下工作。 我不知道println方

  • 问题内容: 我在GregorianCalendar类中遇到一个奇怪的行为,我想知道我是否真的做得不好。 仅当初始化日期的月份的实际Maximum大于我将日历设置为的月份时,才追加此值。 这是示例代码: 我知道问题是由于日历初始化日期是31天(可能是5月),与设置为2月(28天)的月份混淆了。修复很容易(只需在设置年和月之前将day_of_month设置为1),但是我想知道这确实是想要的行为。有什么

  • 问题内容: 我正在为一个问题而苦苦挣扎,我不明白为什么它不起作用。如何通过将变量传递并转换为? 为什么在顶部代码段中不起作用,但在行下方的底部代码段中起作用? 唯一的区别似乎是添加了一个额外的变量,该变量也被键入为? 问题答案: 该是一种原始类型,同时是一个普通的Java类。您不能在原始类型上调用方法。但是该方法在上可用,如javadoc中所示 有关这些原始类型的更多信息,请参见此处

  • 问题内容: 为什么的到哪里去了? 问题答案: 删除任何字符,并从字符串的开头和结尾。

  • 问题内容: 我认为这是一个正常程序,但这是我得到的输出: 有人可以向我解释一下吗? 问题答案: 这是有据可查的PHP行为,请参阅php.net的foreach页面上的警告。 警告 即使在 foreach 循环之后,仍保留 $ value的 引用和最后一个数组元素。建议通过unset()销毁它。 __ 编辑 尝试逐步了解此处实际发生的情况