当前位置: 首页 > 知识库问答 >
问题:

Spark with Cassandra输入/输出

郎长卿
2023-03-14

想象一下以下过程:Spark应用程序(Java实现)正在使用Cassandra数据库加载、转换为RDD并处理数据。该应用程序还从数据库中传输新数据,这些数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用与数据库集成的Spring Data Cassandra。

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

数据处理器。主要方法

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

预计初始加载会有大量数据。因此,数据会在rddBuffer中分页、加载和分发。

还有以下选项可供选择:

  1. Spark Cassandra示例(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),尽管此示例的文档量最小

我想知道Spark与Cassandra集成的最佳实践是什么。在我的实施中,最好的选择是什么?

Apache Spark 1.0.0,Apache Cassandra 2.0.8

共有2个答案

别俊誉
2023-03-14

上面代码中的方法是一种经典的集中式算法,只有在一个节点上执行才能工作。Cassandra和Spark都是分布式系统,因此有必要以一种可以分布在多个节点之间的方式对流程进行建模。

有几种可能的方法:如果您知道要获取的行的键,您可以执行如下简单的操作:(使用DataStaxJava驱动程序)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

这将有效地在Spark集群中分配密钥的获取。请注意如何在闭包内完成与C*的连接,因为这确保了在每个单独的分布式工作线程上执行任务时建立连接。

假设您的示例使用通配符(即密钥未知),那么使用Cassandra的Hadoop接口是一个不错的选择。问题中链接的Spark Cassandra示例说明了此Hadoop接口在Cassandra上的使用。

Callhtml" target="_blank">iope是一个库,它通过提供一个简单的API来访问该功能,从而封装了使用Hadoop接口的复杂性。它仅在Scala中可用,因为它使用特定的Scala功能(如即将发布的版本中的隐含和宏)使用Calliope,您基本上可以声明如何将RDD[type]转换为行键和行值,而Calliope负责将hadoop接口配置为作业。我们发现Calliope(和底层hadoop接口)比使用驱动程序与Cassandra交互快2-4倍。

结论:我将退出Spring数据配置来访问Cassandra,因为这将把您限制在单个节点上。如果可能的话,考虑一个简单的并行访问,或者在Scala中使用Calliope进行探索。

尉迟默
2023-03-14

使用Cassandra和Spark的最简单方法是使用DataStax开发的官方开源Cassandra驱动程序for Spark:https://github.com/datastax/spark-cassandra-connector

该驱动程序构建在Cassandra Java驱动程序之上,在Cassandra和Spark之间提供了直接的桥梁。与Calliope不同,它不使用Hadoop接口。此外,它还提供以下独特功能:

  • 支持所有Cassandra数据类型,包括集合,开箱即用
 类似资料:
  • 文件 std::fs::File 本身实现了 Read 和 Write trait,所以文件的输入输出非常简单,只要得到一个 File 类型实例就可以调用读写接口进行文件输入与输出操作了。而要得到 File 就得让操作系统打开(open)或新建(create)一个文件。还是拿例子来说明 use std::io; use std::io::prelude::*; use std::fs::File;

  • 回顾一下我们写的第一个 Rust 程序就是带副作用的,其副作用就是向标准输出(stdout),通常是终端或屏幕,输出了 Hello, World! 让屏幕上这几个字符的地方点亮起来。println! 宏是最常见的输出,用宏来做输出的还有 print!,两者都是向标准输出(stdout)输出,两者的区别也一眼就能看出。至于格式化输出,基础运算符和字符串格式化小节有详细说明,这里就不再啰嗦了。 更通用

  • Boost.Assign Assign帮助你把一系列的值赋给容器。它通过对operator, (逗号操作符) and operator()() (函数调用操作符)的重载,带给用户一种数据赋值的很容易的方法。除了对原型风格的代码特别有用,这个库的功能在其它时候也很有用,使用这个库有助于提高代码的可读性。使用本库中的list_of还可以就地生成无名数组。 Assign 的作者是 Thorsten Ot

  • 简介 通过前面章节的学习,你已经可以在Scheme的交互式前端中编写并执行程序了。在本章中,我讲介绍如何输入和输出。使用这个特性,你可以从文件中读取数据或向文件中写入数据。 从文件输入 open-input-file,read-char和eof-object? 函数(open-input-file filename)可以用于打开一个文件。此函数返回一个用于输入的端口。函数(read-char po

  • 每个进程操作系统都会分配三个文件资源,分别是标准输入(STDIN)、标准输出(STDOUT)和错误输出(STDERR)。通过这些输入流,我们能够轻易得从键盘获得数据,然后在显示器输出数据。 标准输入 来自管道(Pipe)的数据也是标准输入的一种,我们写了以下的实例来输出标注输入的数据。 package main import ( "fmt" "io/ioutil" "os" ) f

  • 输入/输出 Clojure提供了很少的方法来进行输入/输出的操作。因为我们在Clojure代码里面可以很轻松的使用java里面的I/O操作方法。但是?clojure.java.io 库使得使用java的I/O方法更加简单。 这些预定义的special symbols *in* , *out* 以及 *err* 默认被设定成 stdin, stdout 以及 stderr 。如果要flush *ou

  • 有些时候你的程序会与用户产生交互。举个例子,你会希望获取用户的输入内容,并向用户打印出一些返回的结果。我们可以分别通过 input() 函数与 print 函数来实现这一需求。 对于输入,我们还可以使用 str (String,字符串)类的各种方法。例如,你可以使用 rjust 方法来获得一个右对齐到指定宽度的字符串。你可以查看 help(str) 来了解更多细节。 另一个常见的输入输出类型是处理

  • 输入 输出 会有这种情况,你的程序必须与用户进行交互。例如,你想获取来自用户的输入,然后打印一些返回的结果。我们可以分别使用input()和print()函数来实现。 对于输出,我们还可以使用str(字符串)类的各种方法。例如,您可以使用rjust方法来获取一个指定宽度的字符串。更多细节,见 help(str)。 另一个常见的输入/输出类型是处理文件。创建、读和写文件是许多程序至关重要的,我们将在