想象一下以下过程:Spark应用程序(Java实现)正在使用Cassandra数据库加载、转换为RDD并处理数据。该应用程序还从数据库中传输新数据,这些数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用与数据库集成的Spring Data Cassandra。
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
数据处理器。主要方法:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
预计初始加载会有大量数据。因此,数据会在rddBuffer中分页、加载和分发。
还有以下选项可供选择:
我想知道Spark与Cassandra集成的最佳实践是什么。在我的实施中,最好的选择是什么?
Apache Spark 1.0.0,Apache Cassandra 2.0.8
上面代码中的方法是一种经典的集中式算法,只有在一个节点上执行才能工作。Cassandra和Spark都是分布式系统,因此有必要以一种可以分布在多个节点之间的方式对流程进行建模。
有几种可能的方法:如果您知道要获取的行的键,您可以执行如下简单的操作:(使用DataStaxJava驱动程序)
val data = sparkContext.parallelize(keys).map{key =>
val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
val session = cluster.connect(keyspace)
val statement = session.prepare("...cql...);")
val boundStatement = new BoundStatement(sttmt)
session.execute(session.execute(boundStatement.bind(...data...)
}
这将有效地在Spark集群中分配密钥的获取。请注意如何在闭包内完成与C*的连接,因为这确保了在每个单独的分布式工作线程上执行任务时建立连接。
假设您的示例使用通配符(即密钥未知),那么使用Cassandra的Hadoop接口是一个不错的选择。问题中链接的Spark Cassandra示例说明了此Hadoop接口在Cassandra上的使用。
Callhtml" target="_blank">iope是一个库,它通过提供一个简单的API来访问该功能,从而封装了使用Hadoop接口的复杂性。它仅在Scala中可用,因为它使用特定的Scala功能(如即将发布的版本中的隐含和宏)使用Calliope,您基本上可以声明如何将RDD[type]转换为行键和行值,而Calliope负责将hadoop接口配置为作业。我们发现Calliope(和底层hadoop接口)比使用驱动程序与Cassandra交互快2-4倍。
结论:我将退出Spring数据配置来访问Cassandra,因为这将把您限制在单个节点上。如果可能的话,考虑一个简单的并行访问,或者在Scala中使用Calliope进行探索。
使用Cassandra和Spark的最简单方法是使用DataStax开发的官方开源Cassandra驱动程序for Spark:https://github.com/datastax/spark-cassandra-connector
该驱动程序构建在Cassandra Java驱动程序之上,在Cassandra和Spark之间提供了直接的桥梁。与Calliope不同,它不使用Hadoop接口。此外,它还提供以下独特功能:
文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;
回顾一下我们写的第一个 Rust 程序就是带副作用的,其副作用就是向标准输出(stdout),通常是终端或屏幕,输出了 Hello, World! 让屏幕上这几个字符的地方点亮起来。println! 宏是最常见的输出,用宏来做输出的还有 print!,两者都是向标准输出(stdout)输出,两者的区别也一眼就能看出。至于格式化输出,基础运算符和字符串格式化小节有详细说明,这里就不再啰嗦了。 更通用
Boost.Assign Assign帮助你把一系列的值赋给容器。它通过对operator, (逗号操作符) and operator()() (函数调用操作符)的重载,带给用户一种数据赋值的很容易的方法。除了对原型风格的代码特别有用,这个库的功能在其它时候也很有用,使用这个库有助于提高代码的可读性。使用本库中的list_of还可以就地生成无名数组。 Assign 的作者是 Thorsten Ot
简介 通过前面章节的学习,你已经可以在Scheme的交互式前端中编写并执行程序了。在本章中,我讲介绍如何输入和输出。使用这个特性,你可以从文件中读取数据或向文件中写入数据。 从文件输入 open-input-file,read-char和eof-object? 函数(open-input-file filename)可以用于打开一个文件。此函数返回一个用于输入的端口。函数(read-char po
每个进程操作系统都会分配三个文件资源,分别是标准输入(STDIN)、标准输出(STDOUT)和错误输出(STDERR)。通过这些输入流,我们能够轻易得从键盘获得数据,然后在显示器输出数据。 标准输入 来自管道(Pipe)的数据也是标准输入的一种,我们写了以下的实例来输出标注输入的数据。 package main import ( "fmt" "io/ioutil" "os" ) f
输入/输出 Clojure提供了很少的方法来进行输入/输出的操作。因为我们在Clojure代码里面可以很轻松的使用java里面的I/O操作方法。但是?clojure.java.io 库使得使用java的I/O方法更加简单。 这些预定义的special symbols *in* , *out* 以及 *err* 默认被设定成 stdin, stdout 以及 stderr 。如果要flush *ou
有些时候你的程序会与用户产生交互。举个例子,你会希望获取用户的输入内容,并向用户打印出一些返回的结果。我们可以分别通过 input() 函数与 print 函数来实现这一需求。 对于输入,我们还可以使用 str (String,字符串)类的各种方法。例如,你可以使用 rjust 方法来获得一个右对齐到指定宽度的字符串。你可以查看 help(str) 来了解更多细节。 另一个常见的输入输出类型是处理
输入 输出 会有这种情况,你的程序必须与用户进行交互。例如,你想获取来自用户的输入,然后打印一些返回的结果。我们可以分别使用input()和print()函数来实现。 对于输出,我们还可以使用str(字符串)类的各种方法。例如,您可以使用rjust方法来获取一个指定宽度的字符串。更多细节,见 help(str)。 另一个常见的输入/输出类型是处理文件。创建、读和写文件是许多程序至关重要的,我们将在