当前位置: 首页 > 知识库问答 >
问题:

执行三个并行Spark流作业

邵繁
2023-03-14

我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于toLocaliterator进行处理:

dstreamdata.foreachRDD(entry => {
      entry.toLocalIterator.foreach
spark-submit --deploy-mode cluster --executor-cores 6 --executor-memory 10g --num-executors 2 --conf spark.yarn.submit.waitAppCompletion=false --queue queue_name_of_spark_job

有没有办法在不更改代码的情况下解决这个问题?

共有1个答案

云俊美
2023-03-14

1.1)如果您使用Kinesis作为您的队列,请确保您的执行器内核是Kinesis碎片的两倍。这可能适用于Kafka,我忘记了Kafka连接器是如何工作的。这是因为连接器每碎片消耗一个内核,所以您必须确保您有可用的执行器内核来实际处理数据。

在过去,我使用了每个kinesis碎片一个执行器,每个执行器有2个或更多的内核,这在我的用例中很好地工作。

1.2)目前,您的代码正在将所有数据作为迭代器取回驱动程序。如果您有大量的数据,您可能需要为驱动程序分配更多的资源,以便它能够处理RDD中的所有数据。这感觉有点不对:-如果您可以在一个实例中容纳所有数据,您就不需要Spark的复杂性了!

Spark2.0.x配置为您提供了可用的配置细节。

我建议首先查看driver.cores和/或driver.memory。我怀疑你需要更多的核心,但你需要实验。

下面是一些示例代码,其中包含了更多信息的链接:

dstream.foreachRDD { rdd =>
  // code here is executed by the driver
  rdd.foreachPartition { partitionOfRecords =>
    // code here is executed by the workers per partition
  }
}

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

 类似资料:
  • 我使用了以下格式的输入数据: 我已经使用以下代码段使用多线程将RDD保存为文本文件: 在本例中,我遇到了以下例外情况

  • 一个spark有一个oracle查询。所以我必须并行运行多个作业,以便所有查询都将同时激发。 如何并行运行多个作业?

  • 我们正在接收来自Kafka的星火流数据。一旦在Spark Streaming中开始执行,它只执行一个批处理,其余的批处理开始在Kafka中排队。 我们的数据是独立的,可以并行处理。 我们尝试了多个配置,有多个执行器,核心,背压和其他配置,但到目前为止没有任何工作。排队的消息很多,每次只处理一个微批处理,其余的都留在队列中。 我们从差异实验中得到的统计数据: 实验1 100个文件处理时间48分钟 1

  • 请考虑以下代码: 任务是应该并行执行的运行表的列表。当我们启动这个线程,并且它开始执行时,根据一些计算,我们需要中断(取消)所有这些任务。 中断线程只会停止执行中的一个。我们怎么对付别人?或者流不应该这样使用?或者你知道更好的解决办法?

  • 问题内容: 我有以下方法: 在这里,我依次调用三种方法,这依次命中数据库并获取我的结果,然后对从数据库命中获得的结果进行后处理。我知道如何通过使用并发调用这三种方法。但是我想用Java 8 来实现。有人可以指导我如何通过并行流实现相同目标吗? 编辑 我只想通过Stream并行调用方法。 问题答案: 您可以利用这种方式:

  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在