当前位置: 首页 > 知识库问答 >
问题:

在Flink数据集中保存批量迭代的部分输出的可能性?

令狐运珧
2023-03-14
val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    (result,result)
}
end.count()
org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    count = start.count()
    all = all + count
}
println("total nodes: " + all)

但是这种方法在最小的输入数据上特别慢,迭代版本需要<30秒,循环版本需要>3分钟。
我想flink无法创建执行循环的最佳计划。

有什么我应该尝试的变通方法吗?是否可以对flink进行一些修改,以便在hadoop等上保存部分结果?

共有1个答案

杨阳飇
2023-03-14

不幸的是,目前不可能从批量迭代输出中间结果。您只能在迭代结束时输出最终结果。

而且,正如您正确地注意到的,Flink不能有效地展开while-loop或for-loop,所以这也不会起作用。

如果您的中间结果不是那么大,您可以尝试将中间结果添加到部分解决方案中,然后在迭代结束时输出所有结果。在TransitiveClosureNaive示例中实现了类似的方法,其中在迭代中发现的路径在下一个部分解决方案中累积。

 类似资料:
  • 我们希望将迭代与Async IO运算符结合使用,为同一事件执行顺序API调用。但是,在回答我提出的另一个问题时,有人提到使用Datastreams唱迭代是个坏主意。 管理使用大量内存的状态-从存储中查询 有人能进一步解释一下吗?

  • Flink是否支持数据集中的侧输出功能(批处理Api)?如果没有,从文件加载时如何处理有效和无效记录?

  • 我想在Spring Boot 4应用程序中添加缓存。作为一个键,我想使用我的实体的id(type Long),value是我的实体。我的存储库扩展了Crudepository。我缓存findOne方法的数据: 所以我想为实体的方法保存迭代无效缓存(要一致): 但当我把这种方法称为: 我有个例外: -我如何修复这个EL表达式,使缓存中的所有项目无效,其中id在集合中?-如果这是不可能的,如何使这个方

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。

  • 我正在尝试使用< code > DataSet . writeastext(" file:///path/to/my/file ")将数据集API程序的结果写入一个文件。 但是,该程序不产生任何输出。也不会创建输出文件。这可能是什么原因?

  • 我的问题是关于Apache Flink中多个流上的迭代。 我是Flink的初学者,目前正在尝试对Flink执行递归查询(例如,数据日志)。 例如,查询每5分钟计算一次传递闭包(滚动窗口)。如果我有一个输入流inputStream(由初始边缘信息组成),另一个由inputStream初始化的输出流(传递闭包)。我想通过加入inputStream来迭代地丰富outputStream。对于每个迭代,反馈