当前位置: 首页 > 知识库问答 >
问题:

Spark中的并发作业执行

姜振濂
2023-03-14

我使用了以下格式的输入数据:

0
1
2
3
4
5
…
14

Input Location: hdfs://localhost:9000/Input/datasource

我已经使用以下代码段使用多线程将RDD保存为文本文件:

package org.apache.spark.examples;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

class RunnableDemo implements Runnable
{

    private Thread t;
    private String threadName;
    private String path;
    private JavaRDD<String> javaRDD;
//  private JavaSparkContext javaSparkContext;

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
    {
        this.threadName=threadName;
        this.javaRDD=javaRDD;
        this.path=path;
//      this.javaSparkContext=javaSparkContext;
    }


    @Override
    public void run() {
        System.out.println("Running " +  threadName );      
        try {
            this.javaRDD.saveAsTextFile(path);
//          System.out.println(this.javaRDD.count());
            Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println("Thread " +  threadName + " interrupted.");
                }
         System.out.println("Thread " +  threadName + " exiting.");
//       this.javaSparkContext.stop();
    }

    public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

}

public class SparkJavaTest {



    public static void main(String[] args) {

        //Spark Configurations:

        SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");

        JavaSparkContext ctx=new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(ctx);        

        JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");




        List<StructField> fields= new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));

        JavaRDD<Row> rowRDD =dataCollection.map(
                new Function<String, Row>() {
                    @Override
                    public Row call(String record) throws Exception {
                        String[] fields = record.split("\u0001");                       
                        return RowFactory.create(Integer.parseInt(fields[0].trim()));
                    }                   
                });     

        StructType schema = DataTypes.createStructType(fields);

        DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);        
        dataFrame.registerTempTable("data");

        long recordsCount=dataFrame.count();        
        long splitRecordsCount=5;
        long splitCount =recordsCount/splitRecordsCount;
        List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();

        for(int i=0;i<splitCount;i++)
        {
            long start = i*splitRecordsCount;
            long end = (i+1)*splitRecordsCount;         
            DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);         
            list1.add(temp.toJavaRDD());
        }       

        long length =list1.size();

        int split=0;

        for (int i = 0; i < length; i++) {

            JavaRDD rdd1 =list1.get(i);

            JavaPairRDD rdd3=rdd1.cartesian(rdd1);

            JavaPairRDD<Row,Row> rdd4=rdd3.filter(
                    new Function<Tuple2<Row,Row>,Boolean>()
                    {
                        public Boolean call(Tuple2<Row,Row> s)
                        {
                            Row line1=s._1;
                            Row line2=s._2;

                            long app1 = Integer.parseInt(line1.get(0).toString());

                            long app2 = Integer.parseInt(line2.get(0).toString());


                            if(app1<app2)
                            {
                                return true;
                            }
                            return false;
                        }
                    });

            JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                @Override
                public String call(Tuple2<Row, Row> s)
                        throws Exception {

                    Row data1=s._1;
                    Row data2=s._2;

                    int x =Integer.parseInt(data1.get(0).toString());
                    int y =Integer.parseInt(data2.get(0).toString());

                    String result =x +","+ y+","+(x+y);
                    return result;
                }
            });

            RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

            R.start();
            split++;            
            R.start();

            int index =i;

            while(index<length)
            {
                JavaRDD rdd2 =list1.get(index);
                 rdd3=rdd1.cartesian(rdd2);

                 rdd4=rdd3.filter(
                        new Function<Tuple2<Row,Row>,Boolean>()
                        {
                            public Boolean call(Tuple2<Row,Row> s)
                            {
                                Row line1=s._1;
                                Row line2=s._2;

                                long app1 = Integer.parseInt(line1.get(0).toString());

                                long app2 = Integer.parseInt(line2.get(0).toString());


                                if(app1<app2)
                                {
                                    return true;
                                }
                                return false;
                            }
                        });         

                test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                    @Override
                    public String call(Tuple2<Row, Row> s)
                            throws Exception {

                        Row data1=s._1;
                        Row data2=s._2;

                        int x =Integer.parseInt(data1.get(0).toString());
                        int y =Integer.parseInt(data2.get(0).toString());

                        String result =x +","+ y+","+(x+y);
                        return result;
                    }
                });         

                R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

                R.start();
                split++;            
                index++;                
            }
        }
    }

}

在本例中,我遇到了以下例外情况

共有1个答案

曾沛
2023-03-14

首先,您试图使用多个线程在驱动节点上执行所有工作。这并不符合Spark的精神,因为在您的案例中,每个工作单元都是独立于其他工作单元的,并且可以在不同的机器上执行。这里有一个玩具示例,但对于大量的数据,这将变得非常重要。

更好的方法是使用mappartitions这样的东西将键的范围发送给每个工作人员,让他们执行相应的SQL查询,然后保存结果,每个工作人员有一个线程。这将使代码更清晰,更容易推理(一旦您习惯了RDD的工作方式)。显然,您需要适当地为输入数据设置并行度级别和分区数(这里讨论过)。

您的代码的直接问题是主线程启动其他线程,但不等待它们完成。通常,这会导致生成的线程与父线程一起终止(参见javadoc)。请注意,在链接问题的答案中,main函数在返回之前如何对生成的futures执行get()

 类似资料:
  • 我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?

  • 我正在使用一个石英工作执行特定的任务。 如果另一个Main Job实例仍在运行,我想阻止调度器启动第二个Main Job实例...

  • 问题内容: 它说,在Apache Spark文档中,“ 在每个Spark应用程序中,如果多个“作业”(Spark操作)是由不同的线程提交的,则它们可以同时运行 ”。有人可以为以下示例代码解释如何实现此并发吗? 这两个作业是独立的,必须同时运行。 谢谢。 问题答案: 尝试这样的事情:

  • 我有一个系统,其中REST API(Flask)使用spark-sumbit向正在运行的PySpark发送作业。 出于各种原因,我需要spark同时运行所有任务(即我需要设置执行器的数量=运行时的任务数量)。 这可能通过一项工作来实现吗?

  • 我实际上正在部署一个Spark/Kafka/Cassandra应用程序,而我正面临一个不同解决方案的问题,所以我在这里听取您的建议。 > 我在Spark streaming中运行了一个很长时间的应用程序,用于处理Kafka中的Avro消息。根据消息的性质,我可以做一些不同的案例,最后在Cassandra中保存一个记录,所以只是这些技术的一个基本用例。 所以我正在寻找执行批处理作业的最佳实践。由于s

  • 一个spark有一个oracle查询。所以我必须并行运行多个作业,以便所有查询都将同时激发。 如何并行运行多个作业?