当前位置: 首页 > 知识库问答 >
问题:

Apache Flink中的多流迭代

陆才俊
2023-03-14

我的问题是关于Apache Flink中多个流上的迭代。

我是Flink的初学者,目前正在尝试对Flink执行递归查询(例如,数据日志)。

例如,查询每5分钟计算一次传递闭包(滚动窗口)。如果我有一个输入流inputStream(由初始边缘信息组成),另一个由inputStream初始化的输出流(传递闭包)。我想通过加入inputStream来迭代地丰富outputStream。对于每个迭代,反馈应该是outputStream,并且迭代将持续到outputStream上不再追加边为止。传递闭包的计算应该每隔5分钟定期触发一次。在迭代过程中,inputStream应该是“hold”,并为我的outputStream提供数据。

有可能在Flink中做到这一点吗?感谢您的任何帮助!

共有1个答案

曹季同
2023-03-14

这听起来像是一个侧面输入问题,您希望将“inputStream”视为一个与另一个“outputStream”连接的批处理数据集(带有刷新)。不幸的是,Flink目前没有提供一种简单的实现方法(请参见https://stackoverflow.com/a/48701829/231762)

如果这两个流都来自数据源,那么一种方法是创建一个包装器源来控制记录的顺序。它必须发出类似Tuple2的东西,其中一侧或另一侧为空,然后在下游(自定义)函数中,您基本上将这些拆分并进行连接。

如果这是可能的,那么这个源可以在发出“输入”元组的同时阻止“输出”元组,再加上听起来需要的其他逻辑(5分钟刷新等)。请参阅我对上面另一个SO问题的回应,以了解执行此操作的框架代码。

 类似资料:
  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我在迭代一个对象内的列表时遇到了一个问题,该对象内嵌在另一个映射中。我的目标是迭代这个列表并生成一个映射 ,我希望使用streams和lamdas来实现这一点。 我在上面看到了,我需要通过迭代FolderBo中的elementList从elementBo创建一个带有map 的映射。folderBo本身就在Modelbo的地图内。

  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 我们希望将迭代与Async IO运算符结合使用,为同一事件执行顺序API调用。但是,在回答我提出的另一个问题时,有人提到使用Datastreams唱迭代是个坏主意。 管理使用大量内存的状态-从存储中查询 有人能进一步解释一下吗?

  • 为了利用Jdk 8的中包含的各种查询方法,我试图设计域模型,其中与多重性(具有零个或多个实例)关系的获取器返回