当前位置: 首页 > 知识库问答 >
问题:

Flink: ProcessWindow函数

仰雅昶
2023-03-14

我最近在Flink的新版本中学习ProcessWindowFunction。它表示ProcessWindowFunction支持全局状态和窗口状态。我使用Scala API进行了尝试。到目前为止,我可以让全球状态运转起来,但我没有任何运气让它成为窗口状态。我要做的是处理系统日志,并统计由主机名和严重性级别键入的日志数。我想计算两个相邻窗口之间日志计数的差异。下面是我实现ProcessWindowFunction的代码。

class LogProcWindowFunction extends ProcessWindowFunction[LogEvent, LogEvent, Tuple, TimeWindow] {
  // Create a descriptor for ValueState
  private final val valueStateWindowDesc = new ValueStateDescriptor[Long](
    "windowCounters",
    createTypeInformation[Long])

  private final val reducingStateGlobalDesc = new ReducingStateDescriptor[Long](
    "globalCounters",
    new SumReduceFunction(),
    createTypeInformation[Long])

  override def process(key: Tuple, context: Context, elements: Iterable[LogEvent], out: Collector[LogEvent]): Unit = {
    // Initialize the per-key and per-window ValueState
    val valueWindowState = context.windowState.getState(valueStateWindowDesc)
    val reducingGlobalState = context.globalState.getReducingState(reducingStateGlobalDesc)
    val latestWindowCount = valueWindowState.value()
    println(s"lastWindowCount: $latestWindowCount ......")
    val latestGlobalCount = if (reducingGlobalState.get() == null) 0L else reducingGlobalState.get()
    // Compute the necessary statistics and determine if we should launch an alarm
    val eventCount = elements.size
    // Update the related state
    valueWindowState.update(eventCount.toLong)
    reducingGlobalState.add(eventCount.toLong)
    for (elem <- elements) {
      out.collect(elem)
    }
  }
}

我总是从窗口状态中得到0值,而不是以前应该更新的计数。我已经为这个问题挣扎了几天。有人能帮我弄清楚吗?谢谢。

共有1个答案

姜钊
2023-03-14

每窗口状态的范围是一个窗口实例。对于上面的过程方法,每次调用它时,一个新窗口都在范围内,因此latestWindowCount始终为零。

对于一个普通的、普通的、只会触发一次的窗口,每个窗口的状态都是无用的。只有当一个窗口有多次触发(例如延迟触发)时,才能充分利用每个窗口的状态。如果您试图从一个窗口记住下一个窗口中的内容,那么可以使用全局窗口状态来执行此操作。

有关使用每个窗口状态来记住在后期点火中使用的数据的示例,请参阅Flink高级窗口培训中的幻灯片13-19。

 类似资料:
  • 一种以标量变量的非线性函数为变量的函数称为“函数的函数”,即以函数名为自变量的函数。这类函数包括: 求零点 最优化 求积分 常微分方程 MATLAB通过M文件的函数表示该非线性函数。例如,下面是一个简化的humps函数,来源于matlab/demos路径。 function y = humps(x) y = 1./((x-.3).^2 + .01) + 1./((x-.9).^2 + .04) -

  • 仿函数、仿函数类、函数等 无论喜欢或不喜欢,函数和类似函数的对象——仿函数——遍布STL。关联容器使用它们来使元素保持有序;find_if这样的算法使用它们来控制它们的行为;如果缺少它们,那么比如for_each和transform这样的组件就没有意义了;比如not1和bind2nd这样的适配器会积极地产生它们。 是的,在你看到的STL中的每个地方,你都可以看见仿函数和仿函数类。包括你的源代码中。

  • Rust 提供了高阶函数(Higher Order Function, HOF)。执行一个或多个函数来产生一个用处更大的函数。HOF 和惰性迭代器(lazy iterator)给 Rust 带来了函数式的风格(英文原文:HOFs and lazy iterators give Rust its functional flavor.)。 fn is_odd(n: u32) -> bool {

  • 在 Python 中,定义函数使用 def 语句。一个函数主要由三部分构成: 函数名 函数参数 函数返回值 让我们看一个简单的例子: def hello(name): return name >>> r = hello('ethan') >>> r 'ethan' 在上面,我们定义了一个函数。函数名是 hello;函数有一个参数,参数名是 name;函数有一个返回值,name。 我们也可以

  • 函数(我们Java中的方法)可以使用fun关键字就可以定义: fun onCreate(savedInstanceState: Bundle?) { } 如果你没有指定它的返回值,它就会返回Unit,与Java中的void类似,但是Unit是一个真正的对象。你当然也可以指定任何其它的返回类型: fun add(x: Int, y: Int) : Int { return x + y } 小

  • 函数取得的参数是你提供给函数的值,这样函数就可以利用这些值 做 一些事情。这些参数就像变量一样,只不过它们的值是在我们调用函数的时候定义的,而非在函数本身内赋值。 参数在函数定义的圆括号对内指定,用逗号分割。当我们调用函数的时候,我们以同样的方式提供值。注意我们使用过的术语——函数中的参数名称为 形参 而你提供给函数调用的值称为 实参 。 使用函数形参 例7.2 使用函数形参 #!/usr/bin

  • 高阶函数与普通函数的不同在于,它可以使用一个或多个函数作为参数,可以将函数作为返回值。rust的函数是first class type,所以支持高阶函数。而,由于rust是一个强类型的语言,如果要将函数作为参数或返回值,首先需要搞明白函数的类型。下面先说函数的类型,再说函数作为参数和返回值。 函数类型 前面说过,关键字fn可以用来定义函数。除此以外,它还用来构造函数类型。与函数定义主要的不同是,构

  • 参数声明 rust的函数参数声明和一般的变量声明相仿,也是参数名后加冒号,冒号后跟参数类型,不过不需要let关键字。需要注意的是,普通变量声明(let语句)是可以省略变量类型的,而函数参数的声明则不能省略参数类型。 来看一个简单例子: fn main() { say_hi("ruster"); } fn say_hi(name: &str) { println!("Hi, {}", nam