当前位置: 首页 > 知识库问答 >
问题:

Flink中的全局变量

子车征
2023-03-14

我想使用大小为2的FIFO队列来存储数据流的元素。在任何情况下,我都需要流中的前一个元素,而不是当前元素。为此,我在流代码之外创建了一个队列,并将当前元素加入队列。当我的队列有两个元素时,我将其出列并使用第一个元素。

我面临的问题是,我不能加入队列,因为它是在我的流代码之外声明的。我想这是因为流使用多个JVM,我的队列将在一个JVM中声明。

下面是一个示例代码:

val queue = Queue[Array[Double]]() //Global Queue

val ws = dataStream.map(row => {
    queue.enqueue(row)
    println(queue.size) //Prints 0 always
    if(queue.size == 2){
        result = operate(queue(0))
        queue.dequeue
    }
    result
})

在这里,没有任何东西进入队列,队列大小始终为0。有没有一种方法可以在Flink中创建分布在所有JVM中的全局变量?如果没有,是否有其他方法来实现此逻辑

共有1个答案

海信鸥
2023-03-14

令人惊讶的是,当我用ScalaList替换Queue时,它起了作用。

 类似资料:
  • 本文向大家介绍Lua中的全局变量、非全局变量总结,包括了Lua中的全局变量、非全局变量总结的使用技巧和注意事项,需要的朋友参考一下 前言 Lua将其所有的全局变量保存在一个常规的table中,这个table称为“环境”。这种组织结构的优点在于,其一,不需要再为全局变量创造一种新的数据结构,因此简化了Lua的内部实现;另一个优点是,可以像其他table一样操作这个table。为了便于实施这种操作,L

  • 问题内容: 如何在Java中定义全局变量? 问题答案: 要定义全局变量,你可以使用静态关键字 现在你可以通过调用从任何地方访问a和b

  • 问题内容: 假设我想创建一个sql脚本并执行以下操作: 我不能,因为@SomeVariable与他所属的批次一起死亡,而myProcedure需要它自己的批次。显然,我可以创建一个#temp表并在其中填充所需的任何值,但随后我必须从中选择- 添加代码,尽管琐碎,但会损害可读性,并且当我需要的只是一个全局变量时似乎很愚蠢。有没有更好的办法? 要痛苦地清除。我知道SQL Server具有称为“表”的“

  • 问题内容: 我在控制器的作用域上初始化变量时遇到问题。然后,当用户登录时,它将在另一个控制器中进行更改。此变量用于控制诸如导航栏之类的操作,并根据用户的类型来限制对网站某些部分的访问,因此保持其值很重要。它的问题是初始化它的控制器,再次通过某种方式被调用,然后将变量重置为其初始值。 我认为这不是声明和初始化全局变量的正确方法,也不是真正的全局变量,所以我的问题是什么是正确的方法,并且在使用当前版本

  • 我尝试创建一个Dart单页应用程序。 我已经创建了第一个自定义元素(),它包含整个应用程序。它有一个容器,用于呈现视图。和一个侧导航,它将包含用户信息,并在用户登录时更新。 我想在视图之间共享信息。如何在