那么我需要按照以下方式处理这些消息:
(a,b)->x
上应用一个(couple of)函数,输出(a,b)->y
ab->seq[y]
上应用一个函数,其中ab={(a,b)for all b in b}
(稍后有一个传递,需要“转置”消息以跨所有a处理,但这不是这里的问题)
我如何实现这样的消息合并,从步骤1到步骤2?
它看起来像是子键a
上的groupby,但据我所知,方法groupby将在每个微批处理中应用。我需要的是,对于每个a
,等待接收到所有B
(假设一个简单的计数系统可以工作)。再一次抛开输入数据中的丢失b和错误。
在没有知识的情况下,我会尝试看看这样的合并是否可以通过添加到hdfs文件来实现,每个文件对应一个。并尝试在这些文件满后触发第二个流进程。即。当它包含所有b时,将该文件移动到一个输入目录进行步骤2。但是:
您可以创建主RDD,并使用RDD.union
将流生成的微RDD合并到主RDD。类似于:
var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD // guessing on RDD type
myStream.foreachRDD(rdd => {
if (! rdd.isEmpty) {
masterRDD.union(rdd)
masterRDD.groupBy(...).....
}
})
您也应该花点时间阅读检查点,特别是:
数据检查点-将生成的RDD保存到可靠的存储区。在一些跨多批组合数据的有状态转换中,这是必需的。在这样的转换中,生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的这种无限增长(与依赖链成比例),有状态转换的中间RDD被周期性地检查指向可靠存储(例如HDF),以切断依赖链。
我刚开始使用Spark streaming并尝试运行本教程中的一个示例,我正在跟踪制作并运行我们自己的NetworkWordCount。我已经完成了第8步,并从SBT制作了一个罐子。 现在我正在尝试使用第9步中的命令运行deploy my jar,如下所示: 我创建的jar包含“NetworkWordCount”类,该类具有来自spark示例的以下代码 我无法确定我做错了什么。
:) 我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。 首先这可能吗? 我一直在尝试这样的事情: 但是我得到了这个: 任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢
但是Flink医生说: 在启用Flink检查点的情况下,Flink Kafka使用者将使用来自主题的记录,并以一致的方式定期检查其所有的Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink会将流程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始重新使用来自Kafka的记录。 阅读其他来源,我猜Flink检查点将保存程序的状态以及消耗的偏移量,但Spark检查点只是保存消耗的偏移
我们正在接收来自Kafka的星火流数据。一旦在Spark Streaming中开始执行,它只执行一个批处理,其余的批处理开始在Kafka中排队。 我们的数据是独立的,可以并行处理。 我们尝试了多个配置,有多个执行器,核心,背压和其他配置,但到目前为止没有任何工作。排队的消息很多,每次只处理一个微批处理,其余的都留在队列中。 我们从差异实验中得到的统计数据: 实验1 100个文件处理时间48分钟 1
总而言之,我们有: 产品类别: 产品ID、产品名称 订单类别: 订单ID、产品ID、用户ID、订单日期 这些方法中的任何一种可以被认为是最佳实践吗?还是有不同的解决方案?
首先诚挚的道歉,如果我的问题是重复的,我尝试搜索,但没有找到我的问题的相关答案 首先真诚的道歉,如果我问一些很基本的东西,因为我是Storm的初学者。如果我的问题是重复的,因为我试着搜索但是找不到相关的答案 请就我下面的用例提出建议。 > 因此以25秒为频率的所有元组将汇集在一起,并由Bolt on每25秒发射一次(如果在25秒内收到重复的元组,则只考虑最新的一个元组)。 类似地,所有以10分钟为