当前位置: 首页 > 知识库问答 >
问题:

使用filter(),map(),...在spark java api(org.apache.spark.sparkException)中

巫马瀚漠
2023-03-14

我是spark的新手,当我在java api中使用spark的filter时,我得到了这个错误(如果collect()所有表都正确工作,我可以看到从Cassandra获得的所有数据。)我检查了master和workers版本相同,当应用程序在spark的web ui中启动时,我可以看到它,但是:

[Stage 0:>                                                          (0 + 0) / 6]
[Stage 0:>                                                          (0 + 2) / 6]
[Stage 0:>                                                          (0 + 4) / 6]
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.wicket.markup.html.form.Form;

/**
 *
 * @author mohamadreza
 */
public class SparkTestPanel extends Panel {

    private Form form;

    public SparkTestPanel(String id) {
        super(id);
        form = new Form("form");
        form.setOutputMarkupId(true);
        this.add(form);             
        SparkConf conf = new SparkConf(true);
        conf.setAppName("Spark Test");
        conf.setMaster("spark://192.16.11.18:7049");
        conf.set("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer");
        conf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");

        conf.set("spark.cassandra.connection.host", "192.16.11.18");
        conf.set("spark.cassandra.connection.port", "7005");
        conf.set("spark.cassandra.auth.username", "user");
        conf.set("spark.cassandra.auth.password", "password");
        JavaSparkContext sc = null;
        try {
            sc = new JavaSparkContext(conf);
            JavaRDD<CassandraRow> cache = javaFunctions(sc).cassandraTable("keyspace", "test").cache();
            Long count = cache.filter(new Function<CassandraRow, Boolean>() {
                @Override
                public Boolean call(CassandraRow t1) throws Exception {
                    return t1.getString("value").contains("test");
                }
            }).count();
            String a = count.toString();
        } finally {
            sc.stop();
        }
    }
}

以及spark Version2.1.1、scala Version2.11、Java8和my pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.5</version>
    </dependency>

我将docker用于cassandra和spark节点。(cassandra 3.0版)有人能帮我吗?

共有1个答案

闻人修平
2023-03-14

问题已解决:)

当您想要使用Apache Spark的JAVA Api时,必须将项目的.jar(位于项目根目录中的目标目录中)复制到每个Spark节点(master和workers)中的$spark_path/jars/。如果.jar非常大,则可以拆分ui和Spark代码,只复制Spark代码项目的.jar并在ui项目中使用该Spark代码。

 类似资料:
  • ,和在Python 2中完美工作。这里有一个例子: 但是在Python 3中,我收到以下输出: 如果有人能向我解释这是为什么,我将不胜感激。 为进一步清晰起见,代码截图:

  • 我有一组从共享类型继承的域对象(即、)。子类型具有特定的属性(即、)。 此外,作为解析日志文件的结果,我有一个包含混合子类型的记录列表。 为了计算日志记录上的统计信息,我想只在匹配特定子类型的记录子集上应用数学函数,即只在s上应用数学函数。因此,我希望有一个特定子类型的过滤流。我知道可以使用以下方法将和应用到子类型 在流上多次应用这个filter&cast(特别是在对同一子类型进行多次不同计算时)

  • 对于Java流,有没有理由使用而不是和的组合? 例如使用: 或

  • 我有一个简单的流如下: 但Intellij建议我: “filter()”和“map()”可以互换。检查信息:报告流API调用链可以简化。它允许在遍历集合时避免创建冗余的临时对象。例如 collection.stream()→collection.for每个() collection.stream()。 Intellij给出的例子很容易理解,但我不明白为什么它建议我使用。 我查看了的来源,但没有找到

  • map/reduce/filter 是 Python 中较为常用的内建高阶函数,它们为函数式编程提供了不少便利。 map map 函数的使用形式如下: map(function, sequence) 解释:对 sequence 中的 item 依次执行 function(item),并将结果组成一个 List 返回,也就是: [function(item1), function(item2), f

  • 我从对象和对象数组中更改了一个对象两次,这样在第一次迭代中,我过滤掉了几个对象,在第二次迭代中,我使用map更改了每个过滤后的对象。我能用减速机或更好的吗?