当前位置: 首页 > 知识库问答 >
问题:

Flink DataStream排序程序不输出

苏宏峻
2023-03-14

我用Flink编写了一个小测试用例代码来对数据流进行排序。代码如下:

public enum StreamSortTest {
    ;
    public static class MyProcessWindowFunction extends ProcessWindowFunction<Long,Long,Integer, TimeWindow> {
        @Override
        public void process(Integer key, Context ctx, Iterable<Long> input, Collector<Long> out) {
            List<Long> sortedList = new ArrayList<>();
            for(Long i: input){
                sortedList.add(i);
            }
            Collections.sort(sortedList);
            sortedList.forEach(l -> out.collect(l));
        }
    }

    public static void main(final String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);

        DataStream<Long> probeSource = env.fromSequence(1, 500).setParallelism(2);

        // range partition the stream into two parts based on data value
        DataStream<Long> sortOutput =
                probeSource
                        .keyBy(x->{
                            if(x<250){
                                return 1;
                            } else {
                                return 2;
                            }
                        })
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                        .process(new MyProcessWindowFunction())
                        ;

        sortOutput.print();
        System.out.println(env.getExecutionPlan());
        env.executeAsync();
    }
}

然而,代码只输出执行计划和其他几行。但它不会输出实际排序的数字。我做错了什么?

共有1个答案

蓟安歌
2023-03-14

我能看到的主要问题是,您使用的是基于处理时间的窗口,输入数据非常短,处理时间肯定会少于20秒。而Flink能够检测到输入的结束(如果是来自文件或序列的流,就像您的情况一样),并生成Long。最大水印,它将关闭所有打开的基于事件时间的窗口,并触发所有基于事件时间的计时器。它对基于处理时间的计算没有同样的作用,因此在您的情况下,您需要断言Flink实际上会工作足够长的时间,以便关闭窗口或引用自定义触发器/不同的时间特征。

还有一件事我不确定,因为我从来没有用过它那么多,那就是您是否应该使用刽子手同步进行本地执行,因为这基本上是针对您不想等待作业结果的情况根据这里的文档。

 类似资料:
  • 我正在尝试使用Apache Camel运行一个简单的重新排序程序。该程序使用Java DSL对传入的Java消息重新排序。当我运行这个程序时,消息会被写入文件夹,但根据标题值或消息正文中单个单词的字母顺序,不会出现任何特定的顺序。Camel创建的文件仍然无序,就好像DSL函数什么都没做一样。 如何让这个程序像方法那样对消息进行实际排序?此外,如何让这个程序重新排序,然后以正确的排序顺序将消息聚合到

  • 问题内容: 我想通过JPA从数据库(MySQL)获取数据,我希望它按某些列值进行排序。 因此,最佳做法是: 从数据库中检索数据作为对象列表(JPA),然后使用一些Java API以编程方式对其进行排序。 要么 让数据库通过使用排序选择查询对其进行排序。 提前致谢 问题答案: 如果要检索所有数据库数据的子集,例如在屏幕上显示1000中的20行,则最好对数据库进行排序。这将更快,更轻松,并且允许您一次

  • 问题内容: 我在python中遇到JSON问题。 实际上,如果我尝试执行此代码,python会给我一个排序后的JSON字符串! 例如: 这是输出: 如您所见,我尝试使用“ sort_keys = False”,但未进行任何更改。 如何停止Python对JSON字符串排序? 问题答案: 您将值存储到Python中,而Python 根本没有内在的排序概念,它只是键值映射。因此,当您将项目放入变量时,它

  • 所以我的代码有问题。我必须编写一个程序来对一个外部文本文件进行排序(基本上是一个按随机顺序排列的随机数列表)。所以我试着按照教授的步骤去做,即使我做了,我也没有得到正确的输出。输出应该如下所示: 1 2 2 2 3 3 5 5 6 6 11 13 13 13 13 16 17 17 19 25 27 27 33 34 37 37 43 45 49 51 52 54 57 58 60 63 64 6

  • 本文向大家介绍PowerShell 排序:排序对象/排序,包括了PowerShell 排序:排序对象/排序的使用技巧和注意事项,需要的朋友参考一下 示例 按升序或降序对枚举进行排序 同义词: 假设: 升序排序是默认设置: 阿龙 阿龙 伯尼 查理· 丹尼 要请求降序排列: 丹尼· 查理 ·伯尼· 亚伦 ·亚伦 您可以使用表达式进行排序。 阿龙 阿龙 丹尼· 伯尼· 查理