当前位置: 首页 > 知识库问答 >
问题:

Spark Datastax Java API选择语句

裴弘
2023-03-14

我在这个Github中使用了一个教程,使用java maven项目在cassandra上运行spark:https://Github.com/datastax/spark-cassandra-connector。

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
       }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}
14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
    ... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("id=?", "1").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
    session.execute("DROP KEYSPACE IF EXISTS tester");
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0001," +
                  "'Angel'," +
                  "'Pay'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
    session.execute(
          "INSERT INTO tester.empByRole (id, fname, lname, role) " +
          "VALUES (" +
              "0001," +
              "'Angel'," +
              "'Pay'," +
              "'IT Engineer'" +
              ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
        session.execute(
              "INSERT INTO tester.dept (id, dname) " +
              "VALUES (" +
                  "1553," +
                  "'Commerce'" +
                  ");");

共有1个答案

鲜于宜修
2023-03-14

where方法将Allow Filtering添加到查询中。这不是灵丹妙药,因为它仍然不支持任意字段作为查询谓词。通常,该字段必须是索引列或群集列。如果这对您的数据模型不实用,您可以简单地在RDD上使用filter方法。缺点是过滤器发生在Spark中,而不是在Cassandra中。

因此,id字段可以工作,因为CQLWHERE子句支持它,而我假设role只是一个常规字段。请注意,我并不是建议您索引您的字段或将其更改为集群列,因为我不知道您的数据模型。

 类似资料:
  • 喜欢程序员这份工作的独立的程序员可以为任务选择最好的语言。大多数职业程序员控制不了他们将要使用的语言。通常,这个话题会由执行行政决议而非技术决议的boss说出,他们缺少勇气去提升新型工具,即使他们知道,大多数时候使用最新的知识,最少被接受的工具是最好的。另一些情况下,这种团体中真实的好处,以及拓展一个更大的社区的好处,排除了个人的选择。通常管理者由能够雇用在给定的语言有些经验的程序员驱动。毫无疑问

  • 问题内容: 我在C#项目中将SQLServer用于故障排除程序,并且我有一个包含ID,Question,QuestionId,Solution和Rank的表。我希望有一个问题的多个解决方案,并且程序将选择排名最高的解决方案,该解决方案仅由最高的数字来选择,该数字在每次正确时都会递增。为此,我有以下SQL语句: 当我只有一个可用的解决方案时,它可以很好地工作,但是当我有多个解决方案时,它却不能。 问

  • 问题内容: 我正在浏览tour.golang.org中的示例,并且遇到了我不太了解的这段代码: 我了解通道工作的基础知识,但是我没有得到的是上述select语句的工作方式。教程中的说明说: “ select语句使goroutine等待多个通信操作。一个select阻塞直到它的一种情况可以运行,然后它执行该情况。如果有多个就绪,它将随机选择一个。” 但是案件如何执行?据我所知,他们在说: 案例:发送

  • 问题内容: 刚尝试去最近。我想知道如果您有一条select语句等待在几个通道上进行通信,并且如果消息同时在两个或多个通道上出现,将会发生什么情况。如果所有消息都同时发出,那么select将如何确定接受哪个通道? 谢谢! 问题答案: 从规格: 如果可能发生多种情况,则将做出统一的伪随机选择,以决定执行哪个单一通信。 因此,选择是不确定的。

  • Why do developers choose one programming language over another for a given task? 为什么对一项给定的任务,开发者会偏向于选择某一门语言? Sometimes I choose raw C when I need blazing speed. 有时候当我需要绝对程序运行效率的时候,我会选择原始C语言。 When I wa

  • 我需要添加到我的关系字符串,像这样的格式:(帖子)和我需要从另一个表中获取的帖子(在这个关系中,我只有post_id)。我怎么能采取使用文件?