我使用了以下格式的输入数据:
0
1
2
3
4
5
…
14
Input Location: hdfs://localhost:9000/Input/datasource
我已经使用以下代码段使用多线程将RDD保存为文本文件:
package org.apache.spark.examples;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
class RunnableDemo implements Runnable
{
private Thread t;
private String threadName;
private String path;
private JavaRDD<String> javaRDD;
// private JavaSparkContext javaSparkContext;
RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
{
this.threadName=threadName;
this.javaRDD=javaRDD;
this.path=path;
// this.javaSparkContext=javaSparkContext;
}
@Override
public void run() {
System.out.println("Running " + threadName );
try {
this.javaRDD.saveAsTextFile(path);
// System.out.println(this.javaRDD.count());
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
// this.javaSparkContext.stop();
}
public void start ()
{
System.out.println("Starting " + threadName );
if (t == null)
{
t = new Thread (this, threadName);
t.start ();
}
}
}
public class SparkJavaTest {
public static void main(String[] args) {
//Spark Configurations:
SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");
JavaSparkContext ctx=new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");
List<StructField> fields= new ArrayList<StructField>();
fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));
JavaRDD<Row> rowRDD =dataCollection.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split("\u0001");
return RowFactory.create(Integer.parseInt(fields[0].trim()));
}
});
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);
dataFrame.registerTempTable("data");
long recordsCount=dataFrame.count();
long splitRecordsCount=5;
long splitCount =recordsCount/splitRecordsCount;
List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();
for(int i=0;i<splitCount;i++)
{
long start = i*splitRecordsCount;
long end = (i+1)*splitRecordsCount;
DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);
list1.add(temp.toJavaRDD());
}
long length =list1.size();
int split=0;
for (int i = 0; i < length; i++) {
JavaRDD rdd1 =list1.get(i);
JavaPairRDD rdd3=rdd1.cartesian(rdd1);
JavaPairRDD<Row,Row> rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
R.start();
int index =i;
while(index<length)
{
JavaRDD rdd2 =list1.get(index);
rdd3=rdd1.cartesian(rdd2);
rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
index++;
}
}
}
}
在本例中,我遇到了以下例外情况
首先,您试图使用多个线程在驱动节点上执行所有工作。这并不符合Spark的精神,因为在您的案例中,每个工作单元都是独立于其他工作单元的,并且可以在不同的机器上执行。这里有一个玩具示例,但对于大量的数据,这将变得非常重要。
更好的方法是使用mappartitions
这样的东西将键的范围发送给每个工作人员,让他们执行相应的SQL查询,然后保存结果,每个工作人员有一个线程。这将使代码更清晰,更容易推理(一旦您习惯了RDD的工作方式)。显然,您需要适当地为输入数据设置并行度级别和分区数(这里讨论过)。
您的代码的直接问题是主线程启动其他线程,但不等待它们完成。通常,这会导致生成的线程与父线程一起终止(参见javadoc)。请注意,在链接问题的答案中,main函数在返回之前如何对生成的futures执行get()
。
我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?
我正在使用一个石英工作执行特定的任务。 如果另一个Main Job实例仍在运行,我想阻止调度器启动第二个Main Job实例...
问题内容: 它说,在Apache Spark文档中,“ 在每个Spark应用程序中,如果多个“作业”(Spark操作)是由不同的线程提交的,则它们可以同时运行 ”。有人可以为以下示例代码解释如何实现此并发吗? 这两个作业是独立的,必须同时运行。 谢谢。 问题答案: 尝试这样的事情:
我有一个系统,其中REST API(Flask)使用spark-sumbit向正在运行的PySpark发送作业。 出于各种原因,我需要spark同时运行所有任务(即我需要设置执行器的数量=运行时的任务数量)。 这可能通过一项工作来实现吗?
我实际上正在部署一个Spark/Kafka/Cassandra应用程序,而我正面临一个不同解决方案的问题,所以我在这里听取您的建议。 > 我在Spark streaming中运行了一个很长时间的应用程序,用于处理Kafka中的Avro消息。根据消息的性质,我可以做一些不同的案例,最后在Cassandra中保存一个记录,所以只是这些技术的一个基本用例。 所以我正在寻找执行批处理作业的最佳实践。由于s
一个spark有一个oracle查询。所以我必须并行运行多个作业,以便所有查询都将同时激发。 如何并行运行多个作业?