当前位置: 首页 > 面试题库 >

带Cassandra输入/输出的Spark

黄成荫
2023-03-14
问题内容

请看下面的场景:一个Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据。该应用程序还从数据库中提取新数据,这些新数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用了与数据库集成中的Spring
Data Cassandra。

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main方法:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

预计在初始加载时会有大量数据。因此,将数据分页,装入并分发到rddBuffer中。

还有以下可用选项:

  1. 尽管文档数量很少,但Spark-Cassandra示例(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala)对于这个例子。
  2. Calliope项目(http://tuplejump.github.io/calliope/)

我想知道将Spark与Cassandra集成的最佳实践是什么。在我的实施过程中遵循的最佳选择是什么?

Apache Spark 1.0.0,Apache Cassandra 2.0.8


问题答案:

与Cassandra和Spark一起使用的最简单方法是使用由DataStax开发的Spark的官方开源Cassandra驱动程序:https : //github.com/datastax/spark-cassandra-
connector

该驱动程序基于Cassandra
Java驱动程序构建,并提供了Cassandra和Spark之间的直接桥梁。与Calliope不同,它不使用Hadoop接口。此外,它还提供以下独特功能:

  • 开箱即用地支持所有Cassandra数据类型,包括集合
  • Cassandra行到自定义类或元组的轻量级映射,而无需在Scala中使用任何隐式或其他高级功能
  • 将所有RDD保存到Cassandra
  • 完全支持Cassandra虚拟节点
  • 在服务器端进行过滤/选择的能力,例如,利用Cassandra集群列或二级索引


 类似资料:
  • 想象一下以下过程:Spark应用程序(Java实现)正在使用Cassandra数据库加载、转换为RDD并处理数据。该应用程序还从数据库中传输新数据,这些数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用与数据库集成的Spring Data Cassandra。 CassandraConfig: 数据处理器。主要方法: 预计初始加载会有大量数据。因此,数据会在rddBuffer中分页、

  • 文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;

  • 回顾一下我们写的第一个 Rust 程序就是带副作用的,其副作用就是向标准输出(stdout),通常是终端或屏幕,输出了 Hello, World! 让屏幕上这几个字符的地方点亮起来。println! 宏是最常见的输出,用宏来做输出的还有 print!,两者都是向标准输出(stdout)输出,两者的区别也一眼就能看出。至于格式化输出,基础运算符和字符串格式化小节有详细说明,这里就不再啰嗦了。 更通用

  • Boost.Assign Assign帮助你把一系列的值赋给容器。它通过对operator, (逗号操作符) and operator()() (函数调用操作符)的重载,带给用户一种数据赋值的很容易的方法。除了对原型风格的代码特别有用,这个库的功能在其它时候也很有用,使用这个库有助于提高代码的可读性。使用本库中的list_of还可以就地生成无名数组。 Assign 的作者是 Thorsten Ot

  • 简介 通过前面章节的学习,你已经可以在Scheme的交互式前端中编写并执行程序了。在本章中,我讲介绍如何输入和输出。使用这个特性,你可以从文件中读取数据或向文件中写入数据。 从文件输入 open-input-file,read-char和eof-object? 函数(open-input-file filename)可以用于打开一个文件。此函数返回一个用于输入的端口。函数(read-char po

  • 每个进程操作系统都会分配三个文件资源,分别是标准输入(STDIN)、标准输出(STDOUT)和错误输出(STDERR)。通过这些输入流,我们能够轻易得从键盘获得数据,然后在显示器输出数据。 标准输入 来自管道(Pipe)的数据也是标准输入的一种,我们写了以下的实例来输出标注输入的数据。 package main import ( "fmt" "io/ioutil" "os" ) f