当前位置: 首页 > 知识库问答 >
问题:

持久化Apache Flink窗口

公孙高畅
2023-03-14

我正试图使用Flink以流媒体的方式使用消息队列中的有界数据。数据格式如下:

{"id":-1,"name":"Start"}
{"id":1,"name":"Foo 1"}
{"id":2,"name":"Foo 2"}
{"id":3,"name":"Foo 3"}
{"id":4,"name":"Foo 4"}
{"id":5,"name":"Foo 5"}
...
{"id":-2,"name":"End"}

可以使用事件ID确定消息的开始和结束。我想接收此类批次并将最新的(通过覆盖)批次存储在磁盘或内存中。我可以编写自定义窗口触发器来使用开始和结束标志提取事件,如下所示:

DataStream<Foo> fooDataStream = ...
AllWindowedStream<Foo, GlobalWindow> fooWindow = fooDataStream.windowAll(GlobalWindows.create())
.trigger(new CustomTrigger<>())
.evictor(new Evictor<Foo, GlobalWindow>() {
    @Override
    public void evictBefore(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
        for (Iterator<TimestampedValue<Foo>> iterator = elements.iterator();
             iterator.hasNext(); ) {
            TimestampedValue<Foo> foo = iterator.next();
            if (foo.getValue().getId() < 0) {
                iterator.remove();
            }
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {

    }
});

但是如何保持最新窗口的输出。一种方法是使用ProcessAllWindowFunction接收所有事件并手动将其写入磁盘,但这感觉像是一种黑客行为。我还研究了带有Flink CEP模式的表API(如此问题),但找不到在每个批处理之后清除表以丢弃前一批中的事件的方法。

共有1个答案

穆鸿卓
2023-03-14

有几件事阻碍了你的愿望:

(1) Flink的窗口操作符生成附加流,而不是更新流。它们不是为更新以前发出的结果而设计的。CEP也不会生成更新流。

(2) Flink的文件系统抽象不支持覆盖文件。这是因为对象存储(如S3)不太支持此操作。

我认为你的选择是:

(1)重做您的作业,使其产生更新(变更日志)流。您可以使用toChangelogStream或使用创建更新流的Table/SQL操作来执行此操作,例如GROUP BY(当它在没有时间窗口的情况下使用时)。最重要的是,您需要选择支持撤回/更新的接收器,例如数据库。

(2) 坚持生成附加流,并使用类似于FileSink的东西将结果写入一系列滚动文件。然后在Flink之外编写一些脚本,以获得您想要的结果。

 类似资料:
  • Akka持久化使有状态的actor能留存其内部状态,以便在因JVM崩溃、监管者引起,或在集群中迁移导致的actor启动、重启时恢复它。Akka持久化背后的关键概念是持久化的只是一个actor的内部状态的的变化,而不是直接持久化其当前状态 (除了可选的快照)。这些更改永远只能被附加到存储,没什么是可变的,这使得高事务处理率和高效复制成为可能。有状态actor通过重放保存的变化来恢复,从而使它们可以重

  • Spark通过在操作中将其持久保存在内存中,提供了一种处理数据集的便捷方式。在持久化RDD的同时,每个节点都存储它在内存中计算的任何分区。也可以在该数据集的其他任务中重用它们。 我们可以使用或方法来标记要保留的RDD。Spark的缓存是容错的。在任何情况下,如果RDD的分区丢失,它将使用最初创建它的转换自动重新计算。 存在可用于存储持久RDD的不同存储级别。通过将对象(Scala,Java,Pyt

  • Redis 支持持久化,即把数据存储到硬盘中。 Redis 提供了两种持久化方式: RDB 快照(snapshot) - 将存在于某一时刻的所有数据都写入到硬盘中。 只追加文件(append-only file,AOF) - 它会在执行写命令时,将被执行的写命令复制到硬盘中。 这两种持久化方式既可以同时使用,也可以单独使用。 将内存中的数据存储到硬盘的一个主要原因是为了在之后重用数据,或者是为了防

  • 不要害怕文件系统! Kafka 对消息的存储和缓存严重依赖于文件系统。人们对于“磁盘速度慢”的普遍印象,使得人们对于持久化的架构能够提供强有力的性能产生怀疑。事实上,磁盘的速度比人们预期的要慢的多,也快得多,这取决于人们使用磁盘的方式。而且设计合理的磁盘结构通常可以和网络一样快。 关于磁盘性能的关键事实是,磁盘的吞吐量和过去十年里磁盘的寻址延迟不同。因此,使用6个7200rpm、SATA接口、RA

  • Spark 有一个最重要的功能是在内存中_持久化_ (或 缓存)一个数据集。

  • Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。 你能通过persi