当前位置: 首页 > 知识库问答 >
问题:

火花流:无状态重叠窗口与保持状态

公孙高轩
2023-03-14

选择无状态滑动窗口操作的一些注意事项是什么(例如,通过updateStateByKey或新mapStateByKey)选择保持状态(例如通过updateStateByKey或新mapStateByKey)时,使用火花流处理连续的有限事件会话流?

例如,考虑以下场景:

一种可穿戴设备跟踪由穿戴者进行的体育锻炼。该装置自动检测何时开始锻炼,并发出信息;在锻炼过程中发出附加信息(如心率);最后,当练习完成时发出一条消息。

期望的结果是每个锻炼时段的聚集记录流。即,相同会话的所有事件应该聚集在一起(例如,使得每个会话可以保存在单个DB行中)。请注意,每个会话都有一个有限的长度,但是来自多个设备的整个流是连续的。为了方便起见,让我们假设设备为每个锻炼会话生成一个GUID。

我可以看到两种使用 Spark 流处理此用例的方法:

> < li>

使用非重叠窗口,并保持状态。每个GUID保存一个状态,所有事件都与之匹配。当新事件到达时,状态被更新(例如,使用mapWithState ),并且在事件是“锻炼会话结束”的情况下,将发出基于状态的聚集记录,并且密钥被移除。

使用重叠的滑动窗口,并只保留第一个会话。假设滑动窗口长度为2,间隔为1(见下图)。还假设窗口长度为2 X(最大可能的锻炼时间)。在每个窗口上,事件通过GUID聚合,例如使用减法。然后,在窗口的后半部分开始的所有会话都被转储,其余会话发出。这使得每个事件可以精确使用一次,并确保属于同一会话的所有事件将被聚合在一起。

方法2的示意图:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------

我看到的利弊:

方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话的数量增加,状态可能会大于内存)。然而,如果并发会话的最大数量是有界的,这可能不是问题。

方法#2是昂贵的两倍(每个事件处理两次),并且具有更高的延迟(2 X最大锻炼时间),但更简单且易于管理,因为没有保留状态。

处理这个用例的最佳方法是什么?这些方法中的任何一种都是“正确的”方法,还是有更好的方法?

还应考虑哪些其他优点/缺点?

共有2个答案

谢灵均
2023-03-14

我认为第三种方法的另一个缺点是RDD不是按时间顺序接收的…考虑在集群上运行它们…

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }

还有检查点和驱动程序节点故障呢。.在这种情况下,您是否会再次读取整个数据?想知道您想如何处理这个问题吗?

我想也许映射状态是一个更好的方法,为什么你考虑所有这些场景…

廖君昊
2023-03-14

通常没有正确的方法,每个方法都有权衡。因此,我会在组合中添加额外的方法,并概述我对它们的利弊的看法。所以你可以决定哪一个更适合你。

您可以在外部存储中累积事件的状态。Cassandra经常用于此。您可以分别处理最终事件和正在进行的事件,如下所示:

val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }

它可能是您潜在的最佳解决方案,因为它消除了updateStateByKey的缺点,但考虑到它刚刚作为Spark 1.6版本的一部分发布,它也可能存在风险(因为由于某种原因它不是很宣传)。如果您想了解更多信息,可以使用该链接作为起点

    < li >易于理解或解释(对团队其他成员、新人等)。)(主观) < li >存储:更好地利用内存,仅存储最新的运动状态 < li >存储:将只保留正在进行的练习,并在完成后立即丢弃它们 < li >延迟仅受每个微批处理的性能限制
  • 存储:如果键的数量(并发练习)很大,它可能无法放入集群的内存
  • 处理:它将为状态图中的每个键运行updateState函数,因此如果并发练习的数量很大,性能将受到影响

虽然使用windows可以实现您所需的功能,但在您的场景中,它看起来明显不那么自然。

    < li >某些情况下的处理(取决于数据)可能比updateStateByKey更有效,因为updateStateByKey倾向于对每个键运行更新,即使没有实际更新也是如此
    < li >“最大可能锻炼时间”-这听起来像是一个巨大的风险-它可能是基于人类行为的任意持续时间。有些人可能会忘记“完成锻炼”。也取决于锻炼的种类,但可能从几秒钟到几小时不等,如果您希望快速锻炼的延迟时间更短,同时必须将延迟时间保持在可能存在的最长锻炼时间内 < li >感觉更难向他人解释其工作原理(主观) < li >存储:必须将所有数据保存在窗口框架内,而不仅仅是最新的数据。也将释放内存,只有当窗口将滑动远离这个时隙,而不是当练习实际上已经完成。虽然如果你只保留最后两个时隙,差别可能不大,但是如果你试图通过更频繁地滑动窗口来实现更大的灵活性,差别就会增加。
  • 易于解释等(主观)
  • 纯流处理方法,这意味着火花负责对每个单独的事件采取行动,但不尝试存储状态等。(主观)
  • 存储:不受集群内存的限制来存储状态-可以处理大量的并发练习
  • 处理:状态仅在有实际更新时更新(与updateStateByKey不同)
  • 延迟类似于updateStateByKey,仅受处理每个微批次所需时间的限制
  • 架构中的额外组件(除非您已经使用Cassandra作为最终输出)
  • 处理:默认情况下比仅在火花中处理慢,因为不在内存中,您需要通过网络传输数据
  • 您必须精确实现一次语义才能将数据输出到cassandra(对于在ForeachRDD期间工作人员失败的情况)

我会尝试以下方法:

  • 测试更新对数据和集群的 StateByKey 方法
  • 查看即使有大量
  • 并发练习(预计在高峰时段),内存消耗和处理是否可接受
  • 回退到与卡桑德拉接近,以防万一
 类似资料:
  • 我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。

  • HTTP协议是无状态的:每次请求都是一次新的请求,不会记得之前通信的状态 客户端与服务器端的一次通信,就是一次会话 实现状态保持的方式:在客户端或服务器端存储与会话有关的数据 存储方式包括cookie、session,会话一般指session对象 使用cookie,所有数据存储在客户端,注意不要存储敏感信息 推荐使用sesison方式,所有数据存储在服务器端,在客户端cookie中存储sessio

  • 我尝试使用Spark Streaming并希望有一个全局状态对象,可以在每个批处理后更新。据我所知,至少有两种选择适合我:1。使用,其中Spark将在处理每个批处理后自动更新状态2。使用函数,在这里我必须自己调用更新 类型javapairdStream 中的方法updateStateByKey(Function2 ,optional ,optional >)不适用于参数(new function2

  • 问题内容: 在我公司,我们正在将Web应用程序的前端迁移到ReactJS。我们正在使用create-react- app(更新为v16),而没有Redux。现在,我停留在一个页面上,该页面可以通过以下图像进行简化: 在MainContainer方法中,使用相同的后端请求检索由三个组件(SearchableList,SelectableList和Map)显示的数据。然后,此请求的结果存储在MainC

  • 我有一个,它位于中,但导航栏是隐藏的。当我在iOS 7上运行应用程序时,状态栏显示在我的视图顶部。有没有办法避免这种情况? 我不想写任何特定于操作系统的代码。 我尝试设置为,但它没有解决问题。

  • 在阅读了Flink的文档并四处搜索后,我无法完全理解Flink的句柄在其窗口中的状态。假设我有一个每小时滚动的窗口,其中包含一个聚合函数,该函数将消息累积到某个java pojo或scala case类中。该窗口的大小将与一小时内进入该窗口的事件数量相关联,还是仅仅与POJO/Case类相关联,因为我将事件累加到该对象中。(例如,如果将10000个味精数成一个整数,大小会接近10000*味精大小还