当前位置: 首页 > 知识库问答 >
问题:

在java中使用Apache Spark Connector从Cassandra检索数据时出错

司马昕
2023-03-14

10凯瑟琳

我在本地运行start-all.sh启动了Spark

然后我创建了这个类“SparkCassandraconnector”,它有一个连接spark和Cassandra的命令。

import org.apache.commons.lang.StringUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

public class SparkCassandraConnector  implements Serializable {
public static void main(String[] args) {

    SparkConf conf = new SparkConf().setAppName("Simple Application");

    conf.setMaster("spark://127.0.0.1:7077");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    String[] jars = new String[10];
    jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar";
    jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar";
    jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar";
    jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar";
    conf = conf.setJars(jars);
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class)
            .map(new org.apache.spark.api.java.function.Function<Student, String>() {
                @Override
                public String call(Student person) throws Exception {
                    return person.toString();
                }
            });
    System.out.println("Data as Person beans: \n" + StringUtils.join(rdd.collect(), "\n"));
}
public static class Student implements  Serializable{

    private Integer id;
    private String name;

    public Student(){

    }
    public Student(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
<dependencies>


    <!--Spark-->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

共有1个答案

袁博
2023-03-14

在提供的jar中,缺少包含作业的jar,因此缺少student.class。一个快速修复它的方法是在项目的./target文件夹中添加jar。

另一种选择是将您的工作和所有依赖项打包到一个“uber jar”中,并将该uber jar作为仅声明的jar使用。查看maven shade插件。

还可以通过命令行使用spark-submit--jars选项提供JAR。

 类似资料:
  • 问题内容: 我正在制作一个程序,可以从我创建的数据库中检索字段的输入数据/值。但是,当我运行它时,输出始终为null。不知道怎么了 这是我的代码: 在我的主班我有以下代码: 我不知道为什么我运行它时总是说null。 问题答案: 您实际上并没有在设置员中“设置”任何东西。 最重要的是:我对getter和setter的理解使您可以在getter中进行查询。

  • 我是新来的。我已经成功地使用python将消息发送到IOT中心(D2C)。我们使用的协议是mqtt。我们正在尝试使用java从云(IOT中心)检索数据,但无法找到从云接收消息的正确方法。我的疑问是我们是否可以直接从IOT中心读取消息,或者我们需要将传入消息重定向到事件中心来检索消息。 我还尝试在向云发送数据的同时从java中的iothub读取消息,但得到的错误如下。。(与服务器的连接中断。重新连接

  • Im在JSON数据的第1行第1列获得未捕获的SyntaxError:JSON.parse:意外字符 下面是JSON文件中的数据示例。 我如何在JS中从这个JSON文件中检索数据? 这样写一个JSON是不是不可能的? 当我在浏览器中打开文件时,它看起来像一个整洁的JSON数组?

  • 想要从Firebase数据库中检索用户名及其联系电话。我试着这样做,但在logcat它抛出一个空指针 Logcat:-进程:com。实例巴维亚。epark,PID:27351 java。lang.NullPointerException:尝试调用虚拟方法“java”。String.com。实例巴维亚。埃帕克。用户注册。com上的空对象引用上的getName()。实例巴维亚。埃帕克。地图选择$1$1

  • 我目前正在做一个优惠券项目。在处理我的facade类时,我遇到一个问题,通过使用外键从表中输入客户的ID来获取客户已购买的优惠券列表。 我使用的方法是: 优惠券类别: 调用方法: 我的SQL表: Coupon_vs_Customer表仅包含2行。它们都是其他表的外键。CustomerID连接到“Customers”表,而coupon_ID连接到表“Coupons” 正如您在上面看到的,ID为1的客

  • 问题内容: 我正在尝试使用pyodbc从SQL Server检索数据,并使用Python将其打印在表中。但是,我似乎只能检索列名和数据类型之类的东西,而不能检索列每一行中的实际数据值。 基本上,我试图复制一个检索服务器数据并将其显示在表格中的Excel工作表。我连接到服务器没有任何问题,只是我似乎无法找到表中的实际数据。 这是我的代码的示例: 但是,这样做的结果只是给我一些诸如表名,列名以及一些整