当前位置: 首页 > 知识库问答 >
问题:

在翻转窗口后执行闪烁下沉

家弘业
2023-03-14

来源:Kinesis数据流

水槽:弹性搜索

对于使用AWS服务的两者。

此外,在AWS Kinesis数据分析应用程序上运行我的Flink作业

我面临着flink窗口功能的问题。我的工作是这样的

DataStream<TrackingData> input = ...; // input from kinesis stream
input.keyBy(e -> e.getArea())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction(), new MyProcessWindowFunction())
                .addSink(<elasticsearch sink>);
 private static class MyReduceFunction implements ReduceFunction<TrackingData> {
        @Override
        public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
            trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
            return trackingData;
        }
    }
private static class MyProcessWindowFunction extends ProcessWindowFunction<TrackingData, TrackingData, String, TimeWindow> {
        public void process(String key,
                            Context context,
                            Iterable<TrackingData> in,
                            Collector<TrackingData> out) {

            TrackingData trackingIn = in.iterator().next();

            Long videoDuration =0l;
            for (TrackingData t: in) {
                videoDuration += t.getVideoDuration();
            }
            trackingIn.setVideoDuration(videoDuration);
            out.collect(trackingIn);
        }
    }

示例事件:

{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5} 

我在这里所做的是从动觉流中,我得到了大量的这些事件,我想对每10秒窗口的视频持续时间求和,然后我想将单个事件存储到elasticsearch中。

在动觉中,每秒可以有10000个事件。我不想在elasticsearch中存储所有10000个事件,我只想每10秒存储一个事件。

问题是,当我向此作业发送事件时,它会快速处理此事件并直接进入elasticsearch,但我希望实现:直到每10秒,我希望事件视频持续时间时间递增,10秒后,elasticsearch中只存储一个事件。

我如何才能实现这一点?

共有1个答案

公孙宏畅
2023-03-14

我想你把这个问题误诊了。

您编写的代码将从每10秒长的窗口中为窗口中包含事件的每个不同键生成一个事件<代码>MyProcessWindowFunction没有任何效果:由于窗口结果已预先聚合,每个Iterable将只包含一个事件。

我相信你想这样做:

input.keyBy(e -> e.getArea())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .addSink(<elasticsearch sink>);

你也可以只做

input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .addSink(<elasticsearch sink>);

但是第一个版本会更快,因为它将能够在计算windowAll中的全局总和之前并行计算每个键窗口的结果。

FWIW,表/SQL API通常更适合这种类型的应用程序,并且应该生成比这两种应用程序都更优化的管道。

 类似资料:
  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数

  • 2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数

  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我有一个RecyclerView,它从API加载一些数据,包括一个图像url和一些数据,我使用networkImageView延迟加载图像。 下面是适配器的实现: 问题是,当我们在recyclerView中刷新时,它会在开始时短暂闪烁,这看起来很奇怪。 我只是使用了GridView/ListView,它就像我预期的那样工作。没有闪电战。 在我的片段创建的视图中配置RecycleView: 有人面临

  • 问题内容: 如何使用JavaScript在任务栏中使用户的浏览器闪烁/闪烁/突出显示?例如,如果我每10秒发出一次AJAX请求,以查看用户在服务器上是否有任何新消息,我希望用户立即知道它,即使他当时正在使用其他应用程序。 编辑:这些用户确实希望在收到新消息时分心。 问题答案: 这不会使任务栏按钮以改变颜色的方式闪烁,但是标题将一直闪烁,直到他们移动鼠标为止。这应该可以跨平台使用,即使他们只是在其他