export SPARK_CLASSPATH=/usr/local/cassandra/current/lib/*
export MASTER=mesos://hadoop1:5050
./spark-shell
import java.nio.ByteBuffer
import java.util.{ Map => JMap }
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.conf.Configuration
def cql3RDD(host : String, port : Int)(ks : String, table : String) = {
val conf = new Configuration(sc.hadoopConfiguration)
ConfigHelper.setInputPartitioner(conf, "Murmur3Partitioner")
ConfigHelper.setInputInitialAddress(conf, host)
ConfigHelper.setInputRpcPort(conf, port.toString)
ConfigHelper.setInputColumnFamily(conf, ks,
table)
sc.newAPIHadoopRDD(conf, classOf[CqlPagingInputFormat], classOf[JMap[String,ByteBuffer]], classOf[JMap[String,ByteBuffer]])
}
val rdd = cql3RDD("hadoop1", 9160)("webtrans_tm_tdb", "d_1")
val filtered1 = rdd filter { case (k,v) => ByteBufferUtil.string(k.get("slang"))=="en" }
val filtered2 = filtered1 filter { case (k,v) => ByteBufferUtil.string(k.get("tlang"))=="zh" }
val filtered3 = filtered2 filter { case (k,v) => ByteBufferUtil.string(v.get("scntn")).toLowerCase.contains("macau") }
filtered3.count
filtered3.map(/*do some formatting*/).saveAsTextFile("hdfs://hadoop1:54310/containsMacau.txt")