当前位置: 首页 > 知识库问答 >
问题:

如何保存历史RDD以在给定代码中进一步使用

翁凯定
2023-03-14
{
var history: RDD[(String, List[String]) = sc.emptyRDD()

val dstream1 = ...
val dstream2 = ...

val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)

... do stuff with joined as above, obtain dstreamFiltered ...

dstreamFiltered.foreachRDD{rdd =>
val formatted = rdd.map{case (k,(v1,v2)) => (k,v1) }
history.unpersist(false) // unpersist the 'old' history RDD
history = formatted // assign the new history
history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
history.count() //action to materialize this transformation
}

这个代码逻辑可以很好地保存所有以前的RDD,这些RDD没有成功地加入并保存到未来的批处理中,因此每当我们获得一个带有该RDD相应加入键的记录时,我们都会执行加入,但我不知道这段历史是如何建立的。

共有1个答案

刘和正
2023-03-14

我们可以通过观察RDD谱系是如何随时间演化的,来理解在这种情况下历史是如何建立的。

我们需要两条先前的知识:

  • RDD是不可变结构
  • RDD上的操作可以通过要应用的函数和对输入RDD的引用以功能术语表示。

让我们看一个使用经典字数的快速示例:

val txt = sparkContext.textFile(someFile)
val words = txt.flatMap(_.split(" "))

简而言之,txt是一个HadoopRDD(someFile)words(代码>单词)是一个映射分区(txt,flatMapFunction)。我们把单词的谱系称为DAG(直接无环图),它是由操作链构成的<代码>HadoopRDD

我们可以将相同的原则应用于流媒体操作:

在迭代0中,我们有

var history: RDD[(String, List[String]) = sc.emptyRDD()
// -> history: EmptyRDD
...
val historyDStream = dstream1.transform(rdd => rdd.union(history))
// -> underlying RDD: rdd.union(EmptyRDD)
join, filter
// underlying RDD: ((rdd.union(EmptyRDD).join(otherRDD)).filter(pred)
map
// -> underlying RDD: ((rdd.union(EmptyRDD).join(otherRDD)).filter(pred).map(f) 
history.unpersist(false)
//  EmptyRDD.unpersist (does nothing, it was never persisted)
history = formatted
// history =  ((rdd.union(EmptyRDD).join(otherRDD)).filter(pred).map(f)
history.persist(...)
// history marked for persistence (at the next action)
history.count()
// ((rdd.union(EmptyRDD).join(otherRDD)).filter(pred).map(f).count()
// cache result of:  ((rdd.union(EmptyRDD).join(otherRDD)).filter(pred).map(f)

在迭代1中,我们有(添加rdd0、rdd1作为迭代索引):

val historyDStream = dstream1.transform(rdd => rdd.union(history))
// -> underlying RDD: rdd1.union(((rdd0.union(EmptyRDD).join(otherRDD0)).filter(pred).map(f))
join, filter
// underlying RDD: ((rdd1.union(((rdd0.union(EmptyRDD).join(otherRDD0)).filter(pred).map(f)).join(otherRDD1)).filter(pred)
map
// -> underlying RDD: ((rdd1.union(((rdd0.union(EmptyRDD).join(otherRDD0)).filter(pred).map(f)).join(otherRDD1)).filter(pred).map(f) 
history.unpersist(false)
//  history0.unpersist (marks the previous result for removal, we used it already for our computation above)
history = formatted
// history1 =  ((rdd1.union(((rdd0.union(EmptyRDD).join(otherRDD0)).filter(pred).map(f)).join(otherRDD1)).filter(pred).map(f)
history.persist(...)
// new history marked for persistence (at the next action)
history.count()
// ((rdd1.union(((rdd0.union(EmptyRDD).join(otherRDD0)).filter(pred).map(f)).join(otherRDD1)).filter(pred).map(f).count()
// cache result sothat we don't need to compute it next time  

这个迭代过程会随着每次迭代而继续。

正如我们所看到的,表示RDD计算的图在不断增长<代码>缓存减少了每次进行所有计算的成本<代码>检查点(code>checkpoint)经常需要为这个不断增长的图编写一个具体的计算值,以便我们可以使用它作为基线,而不必评估整个链。

查看此过程的一种有趣方式是在foreachRDD中添加一行来检查当前沿袭:

...
history.unpersist(false) // unpersist the 'old' history RDD
history = formatted // assign the new history
println(history.toDebugString())
...
 类似资料:
  • 在gdb中,缺省是不保存历史命令的。你可以通过如下命令来设置成保存历史命令: (gdb) set history save on 但是,历史命令是缺省保存在了当前目录下的.gdb_history文件中。可以通过如下命令来设置要保存的文件名和路径: (gdb) set history filename fname 现在,我们把这两个命令放到$HOME/.gdbinit文件中: set histo

  • 每次我在我的Android Studio编辑器中打开一个java文件时,我会折叠一些我不想看到的东西(比如注释和一些函数),但是在关闭和打开项目之后,所有的东西都会再次展开!如何让Android Studio记住代码上的折叠区域,并将它们与文件一起保存?

  • 我的案例是批准、拒绝或返回我的请求,并基于该工作流更改其状态。candence如何帮助保存/检索每个工作流的所有操作历史。

  • 由于作者在对教程不断地更新,一些比较旧的内容就从教程中删除或者修改了,但是这部分内容仍然对在更新以前就开始学习的读者可能还是会有点用处的。所以,我会将被大段删除或修改的内容留到这里以供大家参考。 注意的是,一些比较小的改动将不会出现在这里。 01-01 OpenGL Important 基元类型(Primitive Type) 使用OpenGL时,建议使用OpenGL定义的基元类型。比如使用flo

  • 在我的Corda项目中,项目状态可能会随着时间的推移而发展。我已经制作了线性状态类型的状态。现在我想检索Corda状态的历史,这意味着,它是如何随着时间的推移而演变的。如何查看科尔达特定州的演变历史? 特别是,我想访问一个州的完整交易链。

  • 本文向大家介绍如何给Python代码进行加密,包括了如何给Python代码进行加密的使用技巧和注意事项,需要的朋友参考一下 这篇文章主要介绍了如何给Python代码进行加密,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 去年11月在PyCon China 2018 杭州站分享了 Python 源码加密,讲述了如何通过修改 Python 解释器达到