当前位置: 首页 > 知识库问答 >
问题:

是否可以在需要时调用Flink Map(在输入流上未激活)

翟棋
2023-03-14

我有一个闪烁的地图,一旦数据通过流就会被激活。

即使没有数据通过,我也要调用该地图。

我将map移动到一个函数中(无限函数调用),但是flink作业永远不会运行。如果我将其添加到map中,它只会在数据通过时被激活。

想法是,在一个infinte循环中有一个map,检查一些共享变量和另一个闪烁流监控kafka队列,如果数据进入它的过程,它会更改一个共享变量,以某种方式影响无限循环并继续。

如何调用无限循环映射并一起运行flink映射?

我尝试用随机数据创建CollectionMap来激活流和map以调用无限循环,但即使map中存在一个while(true)条件,也几乎立即退出

在IDE中,当我将其推到Flink时,它会工作。本地it几乎立即退出,不停留在循环中

流1

    val data_stream = env.addSource(myConsumer)
      .map(x => {process(x)})

流2

    val elements = List[String]("Start")
    var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())

如何调用无限循环映射并一起运行flink映射?

共有1个答案

云远
2023-03-14

您可以创建一个窗口和一个触发器,并每隔x秒调用一次映射。

您可以在以下文档中找到:<代码>https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

示例:

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

 val data_stream = env.addSource(myConsumer)
  .map(x => {process(x)})

val window: DataStream[String] = data_stream
  .windowAll(GlobalWindows.create())
  .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
  .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
 类似资料:
  • 对于接受文件输入的APIendpoint,我们需要手动关闭还是框架为我们这样做? 我查了新泽西的文件,但找不到任何关于它的信息。 寻找可靠的来源或某种方式来验证它。

  • 问题内容: Java中的InputStreams和OutputStreams是否在销毁时关闭()?我完全理解这可能是错误的形式(尤其是在C和C ++世界中),但是我很好奇。 另外,假设我有以下代码: 无名的FileInputStream是否在p.load()之后超出范围,并因此被破坏,就像C ++范围规则一样?我尝试在Google上搜索Java的匿名变量范围,但这并没有达到我的预期。 谢谢。 问题

  • 问题内容: 我是否需要从HttpServletResponse“刷新”输出流? 我已经从看到了,应该关闭servlet输出流吗?我不需要关闭它,但是尚不清楚是否需要冲洗它。我也应该从容器中得到吗? 问题答案: 不用了 servletcontainer将为您刷新并关闭它。顺便说一句,关闭已经隐式调用了flush。 另请参阅Servlet 3.1规范的 5.6章: 关闭响应后,容器必须立即将响应缓冲区

  • 问题内容: 如果仅在输出流中调用,则可以保证输出,还是需要始终调用? 问题答案: Close()始终刷新,因此无需调用。 编辑:这个答案是基于常识和我遇到的所有输出流。谁将为缓冲流实现close()而不先刷新缓冲区?在close()之前立即调用flush没有什么害处。但是,如果过度调用flush()会导致后果。它可能会在缓冲机制下失败。

  • 我想知道是否有类似于对象实例的函数创建的东西,也许有类似的东西? 这段代码是写的,例如,它也不工作

  • 问题内容: 我正在从另一个来源接收ZipInputStream,并且需要将第一项的InputStream提供给另一个来源。 我希望能够在不将临时文件保存在设备上的情况下执行此操作,但是,我知道获取单个条目的InputStream的唯一方法是通过ZipFile.getInputStream(entry),并且因为我有一个ZipInputStream而不是ZipFile , 这是不可能的。 所以我最好