An example of Integrating Spark and Cassandra

容鸿畴
2023-12-01
export SPARK_CLASSPATH=/usr/local/cassandra/current/lib/*
export MASTER=mesos://hadoop1:5050
./spark-shell

import java.nio.ByteBuffer
import java.util.{ Map => JMap }
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.conf.Configuration

def cql3RDD(host : String, port : Int)(ks : String, table : String)  = {
  val conf = new Configuration(sc.hadoopConfiguration)
  ConfigHelper.setInputPartitioner(conf, "Murmur3Partitioner")
  ConfigHelper.setInputInitialAddress(conf, host)
  ConfigHelper.setInputRpcPort(conf, port.toString)
  ConfigHelper.setInputColumnFamily(conf, ks, table)
  sc.newAPIHadoopRDD(conf, classOf[CqlPagingInputFormat], classOf[JMap[String,ByteBuffer]], classOf[JMap[String,ByteBuffer]])
}

val rdd = cql3RDD("hadoop1", 9160)("webtrans_tm_tdb", "d_1")
val filtered1 = rdd filter { case (k,v) => ByteBufferUtil.string(k.get("slang"))=="en" }
val filtered2 = filtered1 filter { case (k,v) => ByteBufferUtil.string(k.get("tlang"))=="zh" }
val filtered3 = filtered2 filter { case (k,v) => ByteBufferUtil.string(v.get("scntn")).toLowerCase.contains("macau") }
filtered3.count
filtered3.map(/*do some formatting*/).saveAsTextFile("hdfs://hadoop1:54310/containsMacau.txt")
 类似资料:

相关阅读

相关文章

相关问答