当前位置: 首页 > 知识库问答 >
问题:

DStreams的分区(用于updateStateByKey())如何工作以及如何验证它?

孟胤
2023-03-14

我正在使用updateStateByKey()操作来维护我的火花流应用程序中的状态。输入数据来自Kafka主题。

  1. 我想了解DStreams是如何分区的?
  2. 分区如何使用map的状态()或updateStatebyKey()方法工作?
  3. 在updateStateByKey()中,针对给定键的旧状态和新值是否在同一节点上处理?
  4. updateStateByKey()方法的洗牌频率是多少?

我必须保持的状态包含约100000个键,我想避免每次更新状态时洗牌,有什么建议吗?

共有2个答案

端木宏盛
2023-03-14

updateStateByKey 采用分区程序作为第二个参数。http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions

薄涵衍
2023-03-14

链接到Tathagat Das对同一问题的答案:

https://www . mail-archive . com/user @ spark . Apache . org/msg 43512 . html

以下为正文:

默认情况下,mapWithState() 和 updateStateByKey() 都使用 HashPartitioner,并在应用状态操作的键值 DStream 中对键值 DStream 中的键进行哈希处理。新数据和状态在完全相同的分区程序中进行分区,因此来自新数据(来自输入 DStream)的相同键被打乱并与已分区的状态 RDD 共置。因此,新数据在同一台机器中被带到相应的旧状态,然后应用状态映射/更新功能。

状态不是每次都被洗牌,每一批中只有一批新数据被洗牌

 类似资料:
  • 我正在通过一些在线示例学习区块链。我有一个高级代码,我用前面的散列创建一个新的块,然后向其中添加一个事务,然后生成一个困难的块散列(有8个前导零) 这很有效。但我有一个问题,在向区块链添加区块之前,如何验证该工作证明。 即。假设生成了一个包含8个前导零的散列

  • 问题内容: 经常出现在Python模块中。即使阅读了Python的文档,我也不明白它的用途以及使用时间/方式。 有人可以举例说明吗? 关于我收到的基本用法的一些答案似乎是正确的。 但是,我需要了解有关工作原理的另一件事: 对我来说,最令人困惑的概念是当前的python版本如何包含未来版本的功能,以及如何使用当前版本的Python成功地编译使用未来版本的功能的程序。 我猜想当前版本包含了将来的潜在功

  • 有人能给我一个逐步描述如何基于cookie的身份验证工作吗?我从来没有做过任何涉及身份验证或cookie的事情。浏览器需要做什么?服务器需要做什么?按什么顺序?我们怎么保证东西安全?

  • 我正在寻找关于ThreadLocal的以下使用的验证。 我有一个服务,比如说在一组进程上运行,比如系统中的。哪个将在必须以相同的方式识别中的所有进程。 为此,我应该将一个ID值(比如String类型)写入一个新线程的线程本地存储(TLS),然后在需要时读取该值。因此,调用的第一个线程的ID将是识别它们的ID。当第一个线程启动另一个线程时,它将进入这个新线程的TLS并写入这个ID。从那里开始,这个链

  • 问题内容: 这个问题已经在这里有了答案 : 9年前关闭。 可能重复: 什么是SQL注入? 我看到很多php代码在stackoverflow上四处飘荡,字符串的转义太少了。 谁能 说明什么是SQL注入; 说明它可以对您的服务器,数据和代码执行的操作; 举例说明如何执行SQL注入 给php示例代码如何防止SQL注入 问题答案: 我也无法抗拒。 SQL注入是“一种代码注入技术,可利用应用程序数据库层中发

  • 我需要一些帮助来了解spark如何决定分区的数量,以及它们在executors中是如何处理的,我很抱歉这个问题,因为我知道这是一个重复的问题,但即使在阅读了许多文章后,我仍然不能理解我正在放上一个我目前正在工作的真实生活用例,以及我的Spark提交配置和集群配置。 我的硬件配置: < code>3节点计算机,总Vcores=30,总内存=320 GB。 我正在使用spark dataframe J