当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:创建滞后数据流

艾俊晖
2023-03-14
val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}

如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。

输入:1,2,3,4,5,6,7...
输出:NA,NA,1,2,3,4,5...

共有1个答案

钮承恩
2023-03-14

如果您的需求正确,我将把它实现为具有FIFO队列的FlatMapFunction。队列缓冲k事件,并在新事件到达时发出头。如果您需要一个容错流应用程序,队列必须注册为状态。Flink将负责检查点状态(即队列),并在发生故障时恢复状态。

FlatMapFunction可能如下所示:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}

返回带有时滞的元素则涉及更多。这将需要用于发射的计时器线程,因为只有在新元素到达时才调用该函数。

 类似资料:
  • 问题内容: 例 是否有创建系列的有效方法。例如,在每行中包含滞后值(在此示例中,直到滞后2) 这对应于 s = pd.Series([[3,4,5],[2,3,4],[1,2,3]],index = [3,4,5]) 对于时间序列很多且时间很长的数据帧,如何以有效的方式完成呢? 谢谢 看到答案后编辑 好的,最后我实现了这个功能: 它产生期望的输出,并管理结果DataFrame中列的命名。 对于系列

  • 我想“创建或替换”postgres表的触发器。但是,没有这样的sql表达式。 我看到我可以先执行“

  • 问题内容: 我有一个时序DataFrame,我想复制我的200个功能/列中的每一个作为其他滞后功能。因此,目前我在时间t处具有要素,并希望在时间步t-1,t-2等处创建要素。 我知道最好用df.shift()来完成,但是我很难将其完全合并。我还想将列重命名为“功能(t-1)”,“功能(t-2)”。 我的伪代码尝试将是这样的: 最后,如果我有200列和4个滞后时间步长,那么我将拥有一个具有1000个

  • 问题内容: 这是我学习AngularJS的第二天。我有一个我无法解决的问题。一切顺利,直到我创建了工厂。我了解工厂/服务用于重构代码。因此,我从控制器中获取了一些代码,并将其发送到工厂,以便可以加载一些数据,但是事实并非如此。这些是我的代码: index.html app.js customerController.js orders.html customerFactory.js 在Chrome

  • 我在Scala中查看幻灯片函数中的Spark。

  • 创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息