我在这个Github中使用了一个教程,使用java maven项目在cassandra上运行spark:https://Github.com/datastax/spark-cassandra-connector。
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App implements Serializable
{
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person() { }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getfname() { return fname; }
public void setfname(String fname) { this.fname = fname; }
public String getlname() { return lname; }
public void setlname(String lname) { this.lname = lname; }
public String getrole() { return role; }
public void setrole(String role) { this.role = role; }
// other methods, constructors, etc.
}
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("id=?", "1").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
session.execute("DROP KEYSPACE IF EXISTS tester");
session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0001," +
"'Angel'," +
"'Pay'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0002," +
"'John'," +
"'Doe'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0003," +
"'Jane'," +
"'Doe'," +
"'IT Analyst'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0001," +
"'Angel'," +
"'Pay'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0002," +
"'John'," +
"'Doe'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.empByRole (id, fname, lname, role) " +
"VALUES (" +
"0003," +
"'Jane'," +
"'Doe'," +
"'IT Analyst'" +
");");
session.execute(
"INSERT INTO tester.dept (id, dname) " +
"VALUES (" +
"1553," +
"'Commerce'" +
");");
where
方法将Allow Filtering
添加到查询中。这不是灵丹妙药,因为它仍然不支持任意字段作为查询谓词。通常,该字段必须是索引列或群集列。如果这对您的数据模型不实用,您可以简单地在RDD上使用filter
方法。缺点是过滤器发生在Spark中,而不是在Cassandra中。
因此,id
字段可以工作,因为CQLWHERE
子句支持它,而我假设role只是一个常规字段。请注意,我并不是建议您索引您的字段或将其更改为集群列,因为我不知道您的数据模型。
喜欢程序员这份工作的独立的程序员可以为任务选择最好的语言。大多数职业程序员控制不了他们将要使用的语言。通常,这个话题会由执行行政决议而非技术决议的boss说出,他们缺少勇气去提升新型工具,即使他们知道,大多数时候使用最新的知识,最少被接受的工具是最好的。另一些情况下,这种团体中真实的好处,以及拓展一个更大的社区的好处,排除了个人的选择。通常管理者由能够雇用在给定的语言有些经验的程序员驱动。毫无疑问
问题内容: 我在C#项目中将SQLServer用于故障排除程序,并且我有一个包含ID,Question,QuestionId,Solution和Rank的表。我希望有一个问题的多个解决方案,并且程序将选择排名最高的解决方案,该解决方案仅由最高的数字来选择,该数字在每次正确时都会递增。为此,我有以下SQL语句: 当我只有一个可用的解决方案时,它可以很好地工作,但是当我有多个解决方案时,它却不能。 问
问题内容: 我正在浏览tour.golang.org中的示例,并且遇到了我不太了解的这段代码: 我了解通道工作的基础知识,但是我没有得到的是上述select语句的工作方式。教程中的说明说: “ select语句使goroutine等待多个通信操作。一个select阻塞直到它的一种情况可以运行,然后它执行该情况。如果有多个就绪,它将随机选择一个。” 但是案件如何执行?据我所知,他们在说: 案例:发送
问题内容: 刚尝试去最近。我想知道如果您有一条select语句等待在几个通道上进行通信,并且如果消息同时在两个或多个通道上出现,将会发生什么情况。如果所有消息都同时发出,那么select将如何确定接受哪个通道? 谢谢! 问题答案: 从规格: 如果可能发生多种情况,则将做出统一的伪随机选择,以决定执行哪个单一通信。 因此,选择是不确定的。
Why do developers choose one programming language over another for a given task? 为什么对一项给定的任务,开发者会偏向于选择某一门语言? Sometimes I choose raw C when I need blazing speed. 有时候当我需要绝对程序运行效率的时候,我会选择原始C语言。 When I wa
我需要添加到我的关系字符串,像这样的格式:(帖子)和我需要从另一个表中获取的帖子(在这个关系中,我只有post_id)。我怎么能采取使用文件?