当前位置: 首页 > 知识库问答 >
问题:

批之间的流数据共享火花

越开畅
2023-03-14

Spark streaming以微批量处理数据。

使用RDD并行处理每个间隔数据,每个间隔之间没有任何数据共享。

但我的用例需要在间隔之间共享数据。

>

  • 单词“hadoop”和“spark”与前一个间隔计数的相对计数

    所有其他单词的正常字数。

    注意:UpdateStateByKey执行有状态处理,但这将对每个记录而不是特定记录应用函数。

    间隔-1

    输入:

    Sample Input with Hadoop and Spark on Hadoop
    

    输出:

    hadoop  2
    sample  1
    input   1
    with    1
    and 1
    spark   1
    on  1
    
    Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
    
    another 3
    hadoop  1
    spark   2
    and 2
    sample  1
    input   1
    with    1
    on  1
    

    火花发生3次,但输出应为2(3-1)

    对于所有其他单词,它应该给出正常的字数。

    因此,在处理第2个区间数据时,它应该具有第1个区间的字数hadoop和spark

    Apache Storm中的类似用法:

    storm中的分布式缓存

    Storm TransactionalWords

  • 共有1个答案

    锺离嘉容
    2023-03-14

    这可以通过“记住”接收到的最后一个RDD并使用左联接将该数据与下一个流批合并来实现。我们使用streamingcontext.remember来使流式处理产生的RDD能够在我们需要的时候保留。

    我们利用了以下事实:dstream.transform是在驱动程序上执行的操作,因此我们可以访问所有本地对象定义。特别是,我们希望在每个批处理上用所需的值更新到最后一个RDD的可变引用。

    也许一段代码让这个想法更加清晰:

    // configure the streaming context to remember the RDDs produced
    // choose at least 2x the time of the streaming interval
    ssc.remember(xx Seconds)  
    
    // Initialize the "currentData" with an empty RDD of the expected type
    var currentData: RDD[(String, Int)] = sparkContext.emptyRDD
    
    // classic word count
    val w1dstream = dstream.map(elem => (elem,1))    
    val count = w1dstream.reduceByKey(_ + _)    
    
    // Here's the key to make this work. Look how we update the value of the last RDD after using it. 
    val diffCount = count.transform{ rdd => 
                    val interestingKeys = Set("hadoop", "spark")               
                    val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                    val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                    currentData = interesting
                    countDiff                
                   }
    
    diffCount.print()
    
     类似资料:
    • 当前体系结构: 问题: 我们在前端和后端层之间有一个两步流程。 null 微服务2(MS2)需要验证I1的完整性,因为它来自前端。如何避免对MS1进行新的查询?最好的办法是什么? 我试图优化的流删除了步骤1.3和2.3 流程1: null 流程2: 2.1用户X已在本地/会话存储中存储了数据(MS2_Data) 2.2用户X在MS1上保留数据(MS2_Data+MS1_Data) 2.3 MS1使

    • 问题内容: 我有一些称为的数据,该数据位于三个孩子的父对象的范围内: 在这三个指令之间共享的最佳方法是什么?选项包括: 使用隔离的范围传递三遍,从而跨四个范围复制它 让子指示继承父范围,并找到,或在 把上并注入到这一点的子指示 还是有另一种更好的方法? 问题答案: 您可以创建一个工厂,该工厂可以传递给每个指令或控制器。这样可以确保在任何给定时间只有一个数组实例。编辑:这里唯一的陷阱是确保您在指令作

    • 问题内容: 我有一个下载器功能,可以并行下载多个文件。我使用以便下载同一文件的不同块。我想显示下载的状态栏。为此,我需要知道已经下载的总字节数()。 是否有一种方法可以设置将在所有这些进程和主进程之间共享的变量,以便每个进程都可以追加刚刚下载的字节数? 问题答案: 解决方案是利用新进程并传递共享的ctypes值:

    • 问题内容: 我想将一些数据从一个HTML页面发送到另一HTML页面。我通过类似的查询参数发送数据 。这种方法的问题在于数据保留在URL中。是否有其他方法可以使用JavaScript或jquery在HTML页面之间发送数据。 问题答案: 为什么不将值存储在HTML5存储对象(例如或)中,请访问HTML5存储文档以获取更多详细信息。使用此功能,您可以在本地临时/永久存储中间值,然后在以后访问您的值。

    • 我正在做一个phonegap应用程序,我不能在index.html和inappbrowser窗口之间共享数据。我尝试了这段代码,但它对我不起作用。 我还尝试使用localStorage,但只将inappbrowser的数据共享到index.js,而不是将index.js的数据共享到inappbrowser。

    • 问题内容: 有没有办法在AngularJS中的服务之间共享数据? 用例:来自不同服务的数据聚合 例如,我想要一个service1从REST服务加载一些数据。然后,另一个service2将来自另一个REST API的其他数据添加到service1数据中,以创建数据聚合服务。 我基本上是想根据它们使用的API来分离服务,但是仍然有一个服务来最终保存所有数据。 问题答案: 创建使用延迟库的第三项服务,以