当前位置: 首页 > 知识库问答 >
问题:

闪烁流保持窗口中特定事件的状态

慕嘉茂
2023-03-14

我有一个flink流系统,在这里我可以获得点击流数据。

数据格式:

{"uid":"123", "event_type":"view","payload":{"p1":{"price":23}}}
{"uid":"123", "event_type":"view","payload":{"p2":{"price":25}}}
{"uid":"123", "event_type":"a2c","payload":{"p2"}}
{"uid":"123", "event_type":"a2c","payload":{"p1":{}}}
{
    "uid":"123",
    "all":[
        {"event_type":"view", "payload":{"p1":{"price":23}}},
        {"event_type":"view","payload":{"p2":{"price":25}}},
        {"event_type":"a2c","payload":{"p2":{"price":25}}},
        {"event_type":"a2c","payload":{"p1":{"price":23}}}
    ],
   "total":4
}

我怎样才能做到这一点呢?基本上,我必须维护窗口中所有view事件的状态,然后,一旦我获得A2C事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有view事件的状态。我也有一些自定义的Reduce操作。

events.keyBy("uid").window(..).reduce(new ReduceCustomFun(..)).uid("..").name("..");

reduceCustomFun:我将2个事件数据加入到列表中。

共有1个答案

东门佐
2023-03-14

最简单的方法是使用ProcessWindowFunction在每个10分钟窗口结束时执行所有窗口处理。那么您就会有一个iterable,其中包含所有事件(对于给定的uid),您就可以根据这些事件创建合并报表,并且您不必关心维护任何状态。

 类似资料:
  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1

  • 2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数

  • 作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数

  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任