当前位置: 首页 > 知识库问答 >
问题:

Spark Kafka Direct DStream-如果设置了num executors,那么纱线集群模式中有多少个执行器和RDD分区?

何华灿
2023-03-14

< br >我正在尝试使用Spark Kafka直接流方法。如本文所述,它通过创建与kafka主题分区一样多的RDD分区来简化并行性。根据我的理解,spark将为每个RDD分区创建一个执行器来进行计算。

因此,当我以纱簇模式提交应用程序,并将选项num-执行器指定为与分区数不同的值时,会有多少个执行器?

例如,有一个具有2个分区的kafka主题,我将执行器数指定为4个:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf

./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1

我尝试了一下,发现执行器的数量是4,每个执行器都会读取和处理来自kafka的数据。为什么?kafka主题中只有2个分区,4个执行器如何从只有2个分割的kafka话题中读取?

以下是 spark 应用程序和日志的详细信息。

我的火花应用程序,它在每个执行器中打印从kafka接收到的消息(在平面地图方法中):

    ...
    String brokers = args[0];
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
    kafkaParams.put("metadata.broker.list", brokers);

    JavaPairInputDStream<String, String> messages =
        KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
            kafkaParams, topicsSet);

    JavaPairDStream<String, Integer> wordCounts =
        messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
        {
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
            {
                System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
                    tuple._2())); // print the kafka message received  in executor
                return Arrays.asList(SPACE.split(tuple._2()));
            }

        }).mapToPair(new PairFunction<String, String, Integer>()
        {
            public Tuple2<String, Integer> call(String word) throws Exception
            {
                System.out.println(String.format("[word]: %s", word));
                return new Tuple2<String, Integer>(word, 1);
            }

        }).reduceByKey(new Function2<Integer, Integer, Integer>()
        {
            public Integer call(Integer v1, Integer v2) throws Exception
            {
                return v1 + v2;
            }

        });

    wordCounts.print();

    Runtime.getRuntime().addShutdownHook(new Thread(){
        @Override
        public void run(){
            System.out.println("gracefully shutdown Spark!");
            jssc.stop(true, true);
        }
    });
    jssc.start();
    jssc.awaitTermination();

我的Kafka主题,有2个分区。串“你好你好字1”、“你好你好字2”、“你好你好字3”,...被发送到主题。

Topic: topic_2  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: topic_2  Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1
Topic: topic_2  Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2

执行器1的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...

执行器2的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...

执行器3的控制台输出:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...

共有1个答案

范承望
2023-03-14

每个分区一次由一个执行器操作(假设您没有启用推测执行)。

如果你的执行程序比分区多,不是所有的执行程序都会在给定的RDD上工作。但是正如您所注意到的,由于数据流是一个rdd序列,随着时间的推移,每个执行器都会做一些工作。

 类似资料:
  • 在调试和故障处理的时候,我们通常有必要知道 RDD 有多少个分区。这里有几个方法可以找到这些信息: 使用 UI 查看在分区上执行的任务数 当 stage 执行的时候,你可以在 Spark UI 上看到这个 stage 上的分区数。 下面的例子中的简单任务在 4 个分区上创建了共 100 个元素的 RDD ,然后在这些元素被收集到 driver 之前分发一个 map 任务: scala> val s

  • 我正在尝试在我的Spark应用程序中使用多个(通过include)类型安全配置文件,我正在集群模式下提交给一个YARN队列。我基本上有两个配置文件,下面提供了文件布局: null 上面的两个文件都是我的application.jar的外部文件,所以我使用“--files”(可以在下面看到)将它们传递给yarn 我正在使用Typesafe配置库来解析我的“application-main.conf”

  • 在调试和故障处理的时候,我们通常有必要知道 RDD 有多少个分区。这里有几个方法可以找到这些信息: 使用 UI 查看在分区上执行的任务数 当 stage 执行的时候,你可以在 Spark UI 上看到这个 stage 上的分区数。 下面的例子中的简单任务在 4 个分区上创建了共 100 个元素的 RDD ,然后在这些元素被收集到 driver 之前分发一个 map 任务: scala> val s

  • 我正在4节点群集上运行Spark over纱线。节点中每台机器的配置为128GB内存,每个节点24核CPU。我使用此命令运行Spark on 但Spark最多只能启动16个执行者。我将纱线中的最大vcore分配设置为80(在我拥有的94芯中)。因此,我的印象是,这将启动19名执行人,但最多只能启动16名执行人。此外,我认为即使这些执行者也没有完全使用分配的vCore。 这些是我的问题 spark为

  • 有两种部署模式可用于在YARN上启动Spark应用程序。在yarn-cluster模式下,Spark驱动程序在集群上由YARN管理的应用程序主进程中运行,客户端可以在启动应用程序后离开。在yarn-client模式下,驱动程序在客户端进程中运行,而应用程序主进程仅用于向YARN请求资源。 在此,我只能理解的区别是哪个地方的驱动程序在运行,但我无法理解哪个运行得更快。莫尔沃弗: 在运行Spark-s