当前位置: 首页 > 知识库问答 >
问题:

在spark Streaming中使用Java对有序spark stream进行迭代编程?

万高轩
2023-03-14

迭代是什么意思?

我首先使用时间戳对dstream进行排序,假设数据是以单调递增的时间戳到达的(没有乱序)。

我需要一个全局HashMap X,我希望使用时间戳为“T1”的值更新它,然后使用“T1+1”的值更新它。由于X本身的状态会影响计算,所以它需要是一个线性运算。因此,在“t1+1”处的操作取决于HashMap X,而HashMap X取决于在“t1”处和之前的数据。

当一个人试图更新一个模型或比较两组RDD时,或者保持某些事件的全局历史记录时,这种情况尤其如此,这些事件会影响未来迭代中的操作?

我想保留一些累积的历史记录来进行计算…不是整个数据集,而是保存某些事件,这些事件可以在将来的DStream RDD中使用?

共有1个答案

司寇书
2023-03-14

updateStateByKey正是这样做的:它使您能够定义一些状态,以及一个函数来根据流中的每个RDD更新状态。这是随时间累积历史计算的典型方法。

从文档中:

updateStateByKey操作允许您维护任意状态,同时用新信息不断更新它。要使用它,您将必须执行两个步骤。

  1. 定义状态-状态可以是任意数据类型。
  2. 定义状态更新函数-使用函数指定如何使用以前的状态和输入流中的新值更新状态。

更多信息请访问:https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#updateStateByKey-operation

如果这并不能减少它,或者您需要更多的灵活性,那么您可以始终像Cassandra(cf Cassandra Connector:https://github.com/datastax/spark-cassandra-connector)一样显式地存储到键值存储中,尽管该选项通常较慢,因为它在每次查找时系统地涉及网络传输。

 类似资料:
  • 问题内容: 我正在尝试查找给定字符串的排列,但是我想使用迭代。我在网上找到了递归解决方案,但我确实理解它,但是将其转换为迭代解决方案实际上是行不通的。下面附上我的代码。我非常感谢您的帮助: 问题答案: 在我的相关问题评论之后,这是一个Java实现,可以使用Counting QuickPerm Algorithm 来完成您想要的事情:

  • 问题内容: 我正在尝试使用迭代器遍历我的日志列表中的列表。目标是搜索包含与新日志相同的电话号码,类型和日期的日志 但是,我在条件语句中得到了java.util.NoSuchElementException。有谁知道可能导致问题的原因? 我的密码 问题答案: 您在一次迭代中调用了很多次,迫使移至一个不存在的元素。 代替 用 每次调用时,它都会向前移动基础光标。

  • 问题内容: 有人可以给我看一个简单的示例,如何使用新的lambda语法在Java 8中按字母顺序排序。 问题答案: 对于字符串,这将工作

  • 问题内容: 我有兴趣对流中的列表进行排序。这是我正在使用的代码: 我想念什么吗?列表未排序。 它应该根据具有最低值的项目对列表进行排序。 以及打印方法: 问题答案: 这与对参数引用进行排序的地方不同。在这种情况下,您将得到一个排序后的流,最终需要将其收集并分配给另一个变量: 您只是错过了分配结果

  • 问题内容: 我收到一个迭代器作为参数,并且想对值进行两次迭代。 可能吗 ?怎么样 ?签名是由我使用的框架(即Hadoop)强加的。 -编辑- 最后,该方法的真正签名是一个。我被这个Wiki页面所迷住了(实际上这是我发现的唯一不被弃用(但错误的)单词计数示例)。 问题答案: 如果要再次迭代,我们必须缓存来自迭代器的值。至少我们可以将第一次迭代和缓存结合起来: (只需要添加代码答案,就知道您在自己的注

  • 问题内容: 在过去的两年中,我一直在编写Java,现在,我开始用python(另外)进行编写。 问题是,当我查看我的Python代码时,似乎有人试图将Java代码转换为python格式,但结果却很糟糕,因为- python不是Java。 关于如何摆脱“用Python编写Java”模式的任何技巧? 谢谢! 问题答案: 您可能会考虑将自己沉浸在Python范例中。最好的方法是首先了解他们的知识,然后通