Scalding是对Cascading框架的Scala封装,或者更确切地说是一种函数式封装。看到Cascading的时候你可能会觉得这么麻烦的东西有必要学吗?但是再看看Scalding就会发现,这好像跟写一般的Scala代码也没什么区别……小小的封装带来巨大的改变。MapReduce/Spark这一遍玩下来最大的体会就是——用命令式语言表达函数式的东西真是太费劲了(map/reduce是函数式语言的基本操作)!
写惯了Spark再来看java的MapReduce的感觉就是再也回不去了。MapReduce是初恋又如何,函数式语言天生表达map、reduce就是比java这种命令式的要强得多得多得多。在用Spark用了一年之后基本上还是能总结出相对于MapReduce来说Spark的缺点:在单轮任务(典型的的ETL)上基本上没有什么优势,对于集群的利用来说与MapReduce还是有差距,具体表现在资源使用模式固定,无法像MR一样伸缩式的起大量map,这在资源空闲时是巨大的浪费。
网上找到一篇文章列举了列举了Pig, Scalding, Scoobi, Hive, Spark, Scrunch, Cascalog相关介绍的文章列表1。对于我自己而言:
1. 我需要一个简单框架来写MapReduce,并且能够很好利用现有库,hive、pig、spark出局;
2. 需要函数式使用API,Scala封装是很好的选择,还剩下Scoobi, Scalding, Scrunch
3. 易用,满足我司常见的三个场景的需求(列举在下文)。Scoobi, Scrunch对Thrift Parquet支持不好,出局。
从实际使用上来看,用Scala封装得比较好的MapReduce一样可以和写Spark一样简单。赞!
Scalding有两种API:Type-safe API2和 Fields Based API3。Fields Based API有点SQL的意思,喜欢的话还是不错的,专门有一本书叫《programming mapreduce with scalding4》,里面全部都是用Fields Based API讲的。全书总共也就100来页,很快就可以学完。
个人是先接触的Spark,已经被Typesafe洗脑,还是更倾向于接受Type-safe API。下面所有的例子都 是基于Type-safe API的。下面是针对一种是针对我司常见的三种应用场景分别介绍,
直接上的例子,具体地有兴趣可以自行查询API,一些相关的参考资料已经标在脚注中了。
import com.twitter.scalding._
class TextFileExample(args: Args) extends Job(args) {
TypedPipe.from(TextLine(args("input")))
.flatMap(line => line.split("\\s"))
.map { word => (word, 1L) }
.sumByKey // reduce num not set
.write(TypedTsv[(String, Long)](args("output")))
}
import org.apache.hadoop.io.{BytesWritable, Text}
import com.twitter.scalding._
class SequenceFileExample(args: Args) extends Job(args) {
object Agg extends Aggregator[(String, Int), Int, Int] {
def prepare(v: (String, Int)) = v._2
val semigroup = Semigroup.from { (l: Int, r: Int) => l + r }
def present(v: Int) = v
}
TypedPipe.from(WritableSequenceFile[BytesWritable, BytesWritable](args("input")))
.flatMap {
case (k, v) =>
val line = new String(v.getBytes, 0, v.getLength)
line.split("\\s").map(w => (w, 2))
}
.groupBy(_._1).aggregate(Agg) // aggregate by key
.toTypedPipe
.map { case (k, v) => (new Text(k), new io.IntWritable(v)) }
.reduce((a, b) => (a._1, a._2 + b._2))
.write(WritableSequenceFile(args("output")))
}
import parquet.thrift.test.Name // thrift对象
import com.twitter.scalding._
import com.twitter.scalding.parquet.thrift.FixedPathParquetThrift
class ThriftParquetFileExample(args: Args) extends Job(args) {
val source = new FixedPathParquetThrift[Name](args("input"))
val sink = new FixedPathParquetThrift[Name](args("output"))
TypedPipe.from(source)
.map { name => println(name); name }
.write(sink)
}
看起来还不错,但是其实与实际应用还是有些差距的,要解决更多的问题。
重载config方法可以完成对MapReduce任务的配置,包括压缩、队列等等,用-Dmapred.map.output.compress=true
这样的配在启动命令里面也是可以的。
override def config: Map[AnyRef, AnyRef] =
super.config ++ Map(
// JOB OUTPUT
"mapred.output.fileoutputformat.compress" -> "true",
"mapred.output.fileoutputformat.compress.codec" -> "parquet.hadoop.codec.SnappyCodec",
"mapred.output.fileoutputformat.compress.type" -> "BLOCK",
"mapred.output.compression.type" -> "BLOCK",
"mapred.output.compress" -> "true",
"mapred.output.compression.codec" -> "parquet.hadoop.codec.SnappyCodec",
// MAP OUTPUT
"mapred.map.output.compress" -> "true",
"mapred.map.output.compress.codec" -> "parquet.hadoop.codec.SnappyCodec"
)
override def config: Map[AnyRef,AnyRef] = {
super.config ++ Map ("mapreduce.job.queuename" -> args("queue"))
}
这部分不用担心,Args
类实现了一些基本的,可以满足大部分需求。
// class Xyyy(args: Args) extends Job(args)
val input: String = args("input") // --input后面接了单个的字符串
val inputs: List[String] = args.list("input") // --input后面接了多个字符串
val numReduce: Int = args.int("num-reduce") // --num-reduce 后面接一个整数
com.twitter.scalding.Tool
,然后参数ThriftParquetFileExample --hdfs --input testdata/name.parquet --output output
class TextFileExample(args: Args) extends Job(args) { ... }
object TextFileExample extends App {
Tool.main(getClass.getCanonicalName.stripSuffix("$") +: args)
}
这个时候这个文件就有“运行”选项了,再配一下参数就可以跑了。
二者其实是等价的。
import cascading.flow.{Flow, FlowListener}
import com.twitter.scalding._
class TextFileExample(args: Args) extends Job(args) {
val key = StatKey("word", "udc")
val stat = Stat(key)
TypedPipe.from(TextLine(args("input")))
.flatMap(line => line.split("\\s"))
.map { word => stat.inc; (word, 1L) }
.sumByKey // reduce num not set
.write(TypedTsv[(String, Long)](args("output")))
/**
* 任务结束后,自定义的Counter会打印出来,如果不需要获取Counter做更多操作,可以不重载这里
* "copy" from https://itellity.wordpress.com/2014/10/29/counters-using-cascading-flow-listeners-in-scalding/
*/
override def listeners = super.listeners ++ List(new FlowListener {
override def onStarting(flow: Flow[_]): Unit = {}
override def onCompleted(flow: Flow[_]) {
try {
val fs = flow.getFlowStats
println(key.group, key.counter, fs.getCounterValue(key.group, key.counter))
} catch {
case e: Exception => e.printStackTrace()
}
}
override def onThrowable(flow: Flow[_], e: Throwable): Boolean = {
e.printStackTrace()
true
}
override def onStopping(flow: Flow[_]): Unit = {}
})
}
du00cs/mapreduce-spark-example
还包括更多的样例,包括:
不过还有一种需求需要找一下怎么实现——MapReduce的Setup阶段的公共数据初始化,实现了这个就基本上完美了。