当前位置: 首页 > 知识库问答 >
问题:

按顺序执行火花流动作

甄鹏云
2023-03-14

我在火花流应用程序中从kafka读取数据并执行两个操作

  1. 将dstream插入hbase表A
  2. 更新另一个hbase表B

我想确保对于dstream中的每个rdd,插入hbase表A将在对hbase表B进行更新操作之前发生(每个rdd依次发生上述两个动作)

如何在火花流应用中实现这一点

共有2个答案

包沈义
2023-03-14

在单个rdd中顺序更新两个表。foreach()。如果您正确处理了异常,它将按顺序执行。

这种行为背后的事实是,它的DAG将在同一阶段顺序执行。

水品
2023-03-14

据我所知,你可以用以下方式执行上述任务

这将按顺序进行

 recordStream.foreachRDD{rdd => { //this will be Dstream RDD Records from kafka
 val record = rdd.map(line => line.split("\\|")).collect 
 record.foreach {recordRDD => { //Write the code for Insert in hbase}
 record.foreach {recordRDD => { //Write the code for Update in hbase}

希望这有帮助

 类似资料:
  • 我是Spark的初学者,我正在运行我的应用程序,从文本文件中读取14KB的数据,执行一些转换和操作(收集、收集AsMap),并将数据保存到数据库 我在我的macbook上本地运行它,内存为16G,有8个逻辑核。 Java最大堆设置为12G。 这是我用来运行应用程序的命令。 bin/spark-submit-class com . myapp . application-master local[*

  • 因为在过滤2之后,我们还得再找到一个元素来分层极限(2),操作,那么为什么输出不像我解释的那样呢?

  • 问题内容: 我正在使用rub redis宝石。想知道我是否例如: 这样的执行顺序得到保证吗? 问题答案: 当然可以保证顺序,否则流水线将毫无用处。您可以随时查看代码。例如,此测试明确假定命令是按顺序执行的:https : //github.com/redis/redis- rb/blob/master/test/pipelining_commands_test.rb#L32

  • 我有一个Spark集群运行在hdfs之上的纱线模式。我启动了一个带有2个内核和2G内存的worker。然后我提交了一个具有3个核心的1个执行器动态配置的作业。不过,我的工作还能运转。有人能解释启动worker的内核数量和为执行者请求的内核数量之间的差异吗。我的理解是,由于执行者在工人内部运行,他们无法获得比工人可用的资源更多的资源。

  • 如前所述,更改Spark集群冗长性的理想方法是更改相应的log4j.properties。然而,在dataproc上,Spark在Yarn上运行,因此我们必须调整全局配置,而不是/usr/lib/Spark/conf 几点建议: 在dataproc上,我们有几个gcloud命令和属性可以在集群创建过程中传递。请参阅留档是否可以通过指定更改 /etc/hadoop/conf下的log4j.prope

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该