我在火花流应用程序中从kafka读取数据并执行两个操作
我想确保对于dstream中的每个rdd,插入hbase表A将在对hbase表B进行更新操作之前发生(每个rdd依次发生上述两个动作)
如何在火花流应用中实现这一点
在单个rdd中顺序更新两个表。foreach()
。如果您正确处理了异常,它将按顺序执行。
这种行为背后的事实是,它的DAG将在同一阶段顺序执行。
据我所知,你可以用以下方式执行上述任务
这将按顺序进行
recordStream.foreachRDD{rdd => { //this will be Dstream RDD Records from kafka
val record = rdd.map(line => line.split("\\|")).collect
record.foreach {recordRDD => { //Write the code for Insert in hbase}
record.foreach {recordRDD => { //Write the code for Update in hbase}
希望这有帮助
我是Spark的初学者,我正在运行我的应用程序,从文本文件中读取14KB的数据,执行一些转换和操作(收集、收集AsMap),并将数据保存到数据库 我在我的macbook上本地运行它,内存为16G,有8个逻辑核。 Java最大堆设置为12G。 这是我用来运行应用程序的命令。 bin/spark-submit-class com . myapp . application-master local[*
因为在过滤2之后,我们还得再找到一个元素来分层极限(2),操作,那么为什么输出不像我解释的那样呢?
问题内容: 我正在使用rub redis宝石。想知道我是否例如: 这样的执行顺序得到保证吗? 问题答案: 当然可以保证顺序,否则流水线将毫无用处。您可以随时查看代码。例如,此测试明确假定命令是按顺序执行的:https : //github.com/redis/redis- rb/blob/master/test/pipelining_commands_test.rb#L32
我有一个Spark集群运行在hdfs之上的纱线模式。我启动了一个带有2个内核和2G内存的worker。然后我提交了一个具有3个核心的1个执行器动态配置的作业。不过,我的工作还能运转。有人能解释启动worker的内核数量和为执行者请求的内核数量之间的差异吗。我的理解是,由于执行者在工人内部运行,他们无法获得比工人可用的资源更多的资源。
如前所述,更改Spark集群冗长性的理想方法是更改相应的log4j.properties。然而,在dataproc上,Spark在Yarn上运行,因此我们必须调整全局配置,而不是/usr/lib/Spark/conf 几点建议: 在dataproc上,我们有几个gcloud命令和属性可以在集群创建过程中传递。请参阅留档是否可以通过指定更改 /etc/hadoop/conf下的log4j.prope
我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该