val ds : DataStream = ...
val laggedDS : DataStream = ds.map(lag _)
def lag(ds : DataStream, k : Time) : DataStream = {
}
如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。
输入:1,2,3,4,5,6,7...
输出:NA,NA,1,2,3,4,5...
如果您的需求正确,我将把它实现为具有FIFO队列的FlatMapFunction
。队列缓冲k
事件,并在新事件到达时发出头。如果您需要一个容错流应用程序,队列必须注册为状态。Flink将负责检查点状态(即队列),并在发生故障时恢复状态。
FlatMapFunction
可能如下所示:
class Lagger(val k: Int)
extends FlatMapFunction[X, X]
with Checkpointed[mutable.Queue[X]]
{
var fifo: mutable.Queue[X] = new mutable.Queue[X]()
override def flatMap(value: X, out: Collector[X]): Unit = {
// add new element to queue
fifo.enqueue(value)
if (fifo.size == k + 1) {
// remove head element and emit
out.collect(fifo.dequeue())
}
}
// restore state
override def restoreState(state: mutable.Queue[X]) = { fifo = state }
// get state to checkpoint
override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo
}
返回带有时滞的元素则涉及更多。这将需要用于发射的计时器线程,因为只有在新元素到达时才调用该函数。
问题内容: 例 是否有创建系列的有效方法。例如,在每行中包含滞后值(在此示例中,直到滞后2) 这对应于 s = pd.Series([[3,4,5],[2,3,4],[1,2,3]],index = [3,4,5]) 对于时间序列很多且时间很长的数据帧,如何以有效的方式完成呢? 谢谢 看到答案后编辑 好的,最后我实现了这个功能: 它产生期望的输出,并管理结果DataFrame中列的命名。 对于系列
我想“创建或替换”postgres表的触发器。但是,没有这样的sql表达式。 我看到我可以先执行“
问题内容: 我有一个时序DataFrame,我想复制我的200个功能/列中的每一个作为其他滞后功能。因此,目前我在时间t处具有要素,并希望在时间步t-1,t-2等处创建要素。 我知道最好用df.shift()来完成,但是我很难将其完全合并。我还想将列重命名为“功能(t-1)”,“功能(t-2)”。 我的伪代码尝试将是这样的: 最后,如果我有200列和4个滞后时间步长,那么我将拥有一个具有1000个
问题内容: 这是我学习AngularJS的第二天。我有一个我无法解决的问题。一切顺利,直到我创建了工厂。我了解工厂/服务用于重构代码。因此,我从控制器中获取了一些代码,并将其发送到工厂,以便可以加载一些数据,但是事实并非如此。这些是我的代码: index.html app.js customerController.js orders.html customerFactory.js 在Chrome
我在Scala中查看幻灯片函数中的Spark。
创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息