@羲凡——只为了更好的活着
为了方便查询一些些业务数据,将hdfs或hive中的数据写入到Cassandra的一个大表中。在有实际的业务查询时在根据ID和列名等字段,读取数据。直接先上代码
import org.apache.spark.sql.{SaveMode, SparkSession}
object CassandraWriteReadDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("capCassandraUpdater")
.master("local[*]")
// spark.cassandra.connection.host 可以写入Cassandra多个节点,形成高可用
.config("spark.cassandra.connection.host", "10.11.12.14,10.11.12.20,10.11.12.23")
.config("spark.sql.shuffle.partitions",20)
.enableHiveSupport()
.getOrCreate()
//读取hive中的表
val dfFromHive = spark.sql("select * from aarontest.stu_info")
//将df写入到Cassandra中
dfFromHive.write
.format("org.apache.spark.sql.cassandra") //format一定要写
.option("keyspace", "aarontest") //cassandra的keyspace相当于mysql中的库
.option("table", "c_staff")
.option("column", "name")
.option("column","age")
//强一致性 ALL,弱一致性ANY 或 ONE 默认是LOCAL_QUORUM,一般为了提高写入效率设置为ONE
.option("spark.cassandra.output.consistency.level", "ALL")
.mode(SaveMode.Append)
.save()
//从Cassandra读取数据
val dfFormCassandra = spark.read
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "aarontest").option("table", "c_staff").load()
//显示五行Cassandra数据
dfFormCassandra.show(5,false)
spark.stop()
}
}
pom.xml文件中要添加
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.3.1</version>
</dependency>
在用 spark-submit提交的时候一定要加载两个jar包 spark-cassandra-connector_2.11-2.3.1.jar 和 jsr166e-1.1.0.jar。加载jar包的方式很多,只要在spark任务运行时能读取到这两个jar包即可。下面使用 –jars 的方式演示
#!/bin/sh
/usr/local/package/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class aarontest.CassandraWriteReadDemo \
--jars hdfs://ns/lib/spark-cassandra-connector_2.11-2.3.1.jar,hdfs://ns/lib/jsr166e-1.1.0.jar \
hdfs://ns/jars/CassandraWriteReadDemo.jar
====================================================================
@羲凡——只为了更好的活着
若对博客中有任何问题,欢迎留言交流