当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-timerService,onTimer获取整个事件

郦翰学
2023-03-14

我需要一些关于Apache Flink中timerService的帮助。我的用例非常简单,但我没有找到任何明确的答案。

我的程序从源(在我的例子中是rabbitMQ)接收json格式的事件(映射到MyEvent,这里简化了MyEvent)。事件可以立即处理(1),也可以存储以供以后处理(2)。(2) 我认为TimerService是适当的解决方案。在onTimer方法中,我需要整个对象(MyEvent),而不仅仅是键。因此,首先我认为使用整个json作为键,这很好,但不知怎么的,这感觉是错误的,因为在任何示例中,键都不是这样使用的。第二种方法如下:;使用ValueState。但我的密钥不是唯一的,ValueState是每个密钥。同样,我可以用一个对象作为键。。。在onTimer中,我只能访问ctx中事件的密钥。currentKey,而不是事件本身。。。

因此,我的问题是:无论对象是什么样子,我如何存储整个事件以供后续处理?

这是代码(kotlin)

data class MyEvent(val event: String, val secs: Int)

class CountWithTimeoutFunction : KeyedProcessFunction<String, MyEvent, MyEvent>() {

    private lateinit var state: ValueState<MyEvent>

    override fun open(parameters: Configuration?) {
        state = runtimeContext.getState(ValueStateDescriptor("myState", MyEvent::class.java))
    }

    override fun processElement(myEvent: MyEvent, context: Context, collector: Collector<MyEvent>) {
        println("" + Date() + "-processElement-" + myEvent)
        state.update(myEvent)
        context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + myEvent.secs * 1000)
    }

    override fun onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector<MyEvent>) {
        val myEvent = state.value()
        println("" + Date() + "-onTimer-" + ctx.currentKey + " state.value()_" + myEvent)
    }

}

谢谢你的建议。

共有1个答案

步衡
2023-03-14

对于这样的情况,一种常见的、简单的技术是通过向您使用随机数填充的事件添加一个字段来为每个事件提供一个唯一的键。(请注意,执行keyBy(random.nextLong())是行不通的,因为Flink依赖于键的确定性。)

有时使用的另一种技术是使用MapState,其中键是计时器时间戳,值是等待该计时器的事件列表。当事件到达时,将它们附加到列表中作为时间戳。当计时器启动时,处理列表中的所有内容,然后丢弃它。

第二种方法将使用较少的计时器,但由于处理这些列表的开销,其效率可能较低(至少对于RocksDB state后端而言)。

 类似资料:
  • 我有一个用例:stepA- 但是我还能做什么呢?谢谢

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 我正在从事一个小型Drools项目,因为我想了解更多关于使用规则引擎的知识。我有一个名为Event的类,它有以下字段: <代码>字符串标记 可以是任何字符串的标记 我在我的知识库中插入了数百个事件实例,现在我想得到3个最近的事件,它们都标记为“OK”(确定)。我想出了以下代码,它可以工作: 但是我有一种感觉,应该有更好的方法来做到这一点。这很冗长,不容易重复使用:如果我想获取具有

  • 我的项目中有:https://www.primefaces.org/primereact/#/fullcalendar 我一天有2个事件,我通过以下代码找到了第一个事件: 我需要找到第二个或更多的事件。 UPD如何在第二天工作//div[@class='fc-entent-bone eton'][.//td[@data-date='2019-09-03']]//tbody/tr/td[计数(//t

  • 我有一个带有TimerService的无状态会话Bean。在超时时,它开始使用JMS队列。在处理消息时,它需要访问可能暂时不可用的外部资源。timeout方法在循环中调用,直到: 没有更多的消息要处理:它注册一个新的定时器=现在+10分钟。和结尾。 处理过程中发生错误:它回滚消息并注册一个新的计时器:现在+30分钟。和结尾。 通过这种方式,我可以控制何时重新启动,而且由于TimerService回