当前位置: 首页 > 知识库问答 >
问题:

Flink如何处理迭代循环中的时间戳?

胡曾笑
2023-03-14

如何在Flink中的迭代数据流循环中处理时间戳?

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
    null

共有1个答案

吕天逸
2023-03-14

Flink不提供任何输入对象排序的保证。我在尝试使用Flink中的聚类算法的迭代时遇到了这种情况,在Flink中质心更新没有得到及时的处理。我发现的唯一解决方案是创建一个传入事件和质心更新的单一(统一的)流,而不是使用一个协同流。

这里有一个解决迭代的一些缺点的建议。

 类似资料:
  • 下面是我的父组件,它包含一个循环的多个输入。如何选择一个来聚焦?在这种情况下,我必须创建动态吗?

  • 本节我们探索的主题是迭代与循环。循环通常在计算机编程用用于自动执行重复性任务。 在Python中最常用的迭代形式就是for循环了。for循环允许你迭代出列表中所有的项,迭代出来后你可以做任何你想做的事情。 比如,我们创建了一个列表,并打印出其中所有元素的平方。 >>> for value in [0, 1, 2, 3, 4, 5]: ... print(value * value) ... 0 1

  • 迭代器无效是如何处理的,而不是循环? 例如,这段代码不起作用,因为迭代器在插入后无效: 但是,如果我用这个for循环替换while循环,它会正确编译和运行: 为什么for循环有效而while循环无效?

  • 如何在迭代中更改python迭代器? 例如: 此打印: 我想打印: 编辑:抱歉,这个问题让人们感到困惑。我对为什么当我在for循环中尝试更改迭代器时,for循环在下一次迭代中忽略它很感兴趣。 其他人已经提交了一个答案来清除我的困惑。范围创建一个列表,然后for循环将迭代器分配给列表中的下一个变量。

  • 2.3 循环与遍历 程序的比人肉强大的另一个特性就是可以任劳任怨地重复地做些单调无聊(或有聊)的 工作。本节介绍在 VimL 语言中,如何控制程序,命令其循环地按规则干活。 遍历集合变量 首先介绍的是如何依次访问列表如字典内的所有元素,毕竟在 2.1 节介绍的索引方法只 适于偶尔访问查看某个具体的元素。这里要用到的是for ... in 语法。 例如遍历列表: : let list = [0, 1

  • 如果我试图在java中查找遍历foreach循环的次数,还有比这更好的方法吗? 我知道我可以只使用一个普通的for循环并拥有一个迭代器I,但我只是想知道是否有一种更优雅的方法来使用foreach.tanks in prevair:) 编辑:正如Jon Skeet指出的,上面的代码将新的按钮实例分配给迭代变量。哎呀。基于其他评论,我相信最实用的使用是一个正常循环。这是我要用的版本。谢谢大家的帮助!