我是spark的新手,当我在java api中使用spark的filter时,我得到了这个错误(如果collect()所有表都正确工作,我可以看到从Cassandra获得的所有数据。)我检查了master和workers版本相同,当应用程序在spark的web ui中启动时,我可以看到它,但是:
[Stage 0:> (0 + 0) / 6]
[Stage 0:> (0 + 2) / 6]
[Stage 0:> (0 + 4) / 6]
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.wicket.markup.html.form.Form;
/**
*
* @author mohamadreza
*/
public class SparkTestPanel extends Panel {
private Form form;
public SparkTestPanel(String id) {
super(id);
form = new Form("form");
form.setOutputMarkupId(true);
this.add(form);
SparkConf conf = new SparkConf(true);
conf.setAppName("Spark Test");
conf.setMaster("spark://192.16.11.18:7049");
conf.set("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer");
conf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");
conf.set("spark.cassandra.connection.host", "192.16.11.18");
conf.set("spark.cassandra.connection.port", "7005");
conf.set("spark.cassandra.auth.username", "user");
conf.set("spark.cassandra.auth.password", "password");
JavaSparkContext sc = null;
try {
sc = new JavaSparkContext(conf);
JavaRDD<CassandraRow> cache = javaFunctions(sc).cassandraTable("keyspace", "test").cache();
Long count = cache.filter(new Function<CassandraRow, Boolean>() {
@Override
public Boolean call(CassandraRow t1) throws Exception {
return t1.getString("value").contains("test");
}
}).count();
String a = count.toString();
} finally {
sc.stop();
}
}
}
以及spark Version2.1.1、scala Version2.11、Java8和my pom.xml:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.5</version>
</dependency>
我将docker用于cassandra和spark节点。(cassandra 3.0版)有人能帮我吗?
问题已解决:)
当您想要使用Apache Spark的JAVA Api时,必须将项目的.jar
(位于项目根目录中的目标目录中)复制到每个Spark节点(master和workers)中的$spark_path/jars/
。如果.jar
非常大,则可以拆分ui和Spark代码,只复制Spark代码项目的.jar
并在ui项目中使用该Spark代码。
,和在Python 2中完美工作。这里有一个例子: 但是在Python 3中,我收到以下输出: 如果有人能向我解释这是为什么,我将不胜感激。 为进一步清晰起见,代码截图:
我有一组从共享类型继承的域对象(即、)。子类型具有特定的属性(即、)。 此外,作为解析日志文件的结果,我有一个包含混合子类型的记录列表。 为了计算日志记录上的统计信息,我想只在匹配特定子类型的记录子集上应用数学函数,即只在s上应用数学函数。因此,我希望有一个特定子类型的过滤流。我知道可以使用以下方法将和应用到子类型 在流上多次应用这个filter&cast(特别是在对同一子类型进行多次不同计算时)
对于Java流,有没有理由使用而不是和的组合? 例如使用: 或
我有一个简单的流如下: 但Intellij建议我: “filter()”和“map()”可以互换。检查信息:报告流API调用链可以简化。它允许在遍历集合时避免创建冗余的临时对象。例如 collection.stream()→collection.for每个() collection.stream()。 Intellij给出的例子很容易理解,但我不明白为什么它建议我使用。 我查看了的来源,但没有找到
map/reduce/filter 是 Python 中较为常用的内建高阶函数,它们为函数式编程提供了不少便利。 map map 函数的使用形式如下: map(function, sequence) 解释:对 sequence 中的 item 依次执行 function(item),并将结果组成一个 List 返回,也就是: [function(item1), function(item2), f
我从对象和对象数组中更改了一个对象两次,这样在第一次迭代中,我过滤掉了几个对象,在第二次迭代中,我使用map更改了每个过滤后的对象。我能用减速机或更好的吗?