请看下面的场景:一个Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据。该应用程序还从数据库中提取新数据,这些新数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用了与数据库集成中的Spring
Data Cassandra。
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
DataProcessor.main方法:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
预计在初始加载时会有大量数据。因此,将数据分页,装入并分发到rddBuffer中。
还有以下可用选项:
我想知道将Spark与Cassandra集成的最佳实践是什么。在我的实施过程中遵循的最佳选择是什么?
Apache Spark 1.0.0,Apache Cassandra 2.0.8
与Cassandra和Spark一起使用的最简单方法是使用由DataStax开发的Spark的官方开源Cassandra驱动程序:https : //github.com/datastax/spark-cassandra-
connector
该驱动程序基于Cassandra
Java驱动程序构建,并提供了Cassandra和Spark之间的直接桥梁。与Calliope不同,它不使用Hadoop接口。此外,它还提供以下独特功能:
想象一下以下过程:Spark应用程序(Java实现)正在使用Cassandra数据库加载、转换为RDD并处理数据。该应用程序还从数据库中传输新数据,这些数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用与数据库集成的Spring Data Cassandra。 CassandraConfig: 数据处理器。主要方法: 预计初始加载会有大量数据。因此,数据会在rddBuffer中分页、
文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;
回顾一下我们写的第一个 Rust 程序就是带副作用的,其副作用就是向标准输出(stdout),通常是终端或屏幕,输出了 Hello, World! 让屏幕上这几个字符的地方点亮起来。println! 宏是最常见的输出,用宏来做输出的还有 print!,两者都是向标准输出(stdout)输出,两者的区别也一眼就能看出。至于格式化输出,基础运算符和字符串格式化小节有详细说明,这里就不再啰嗦了。 更通用
Boost.Assign Assign帮助你把一系列的值赋给容器。它通过对operator, (逗号操作符) and operator()() (函数调用操作符)的重载,带给用户一种数据赋值的很容易的方法。除了对原型风格的代码特别有用,这个库的功能在其它时候也很有用,使用这个库有助于提高代码的可读性。使用本库中的list_of还可以就地生成无名数组。 Assign 的作者是 Thorsten Ot
简介 通过前面章节的学习,你已经可以在Scheme的交互式前端中编写并执行程序了。在本章中,我讲介绍如何输入和输出。使用这个特性,你可以从文件中读取数据或向文件中写入数据。 从文件输入 open-input-file,read-char和eof-object? 函数(open-input-file filename)可以用于打开一个文件。此函数返回一个用于输入的端口。函数(read-char po
每个进程操作系统都会分配三个文件资源,分别是标准输入(STDIN)、标准输出(STDOUT)和错误输出(STDERR)。通过这些输入流,我们能够轻易得从键盘获得数据,然后在显示器输出数据。 标准输入 来自管道(Pipe)的数据也是标准输入的一种,我们写了以下的实例来输出标注输入的数据。 package main import ( "fmt" "io/ioutil" "os" ) f