最近在学习crunch
先附上官网文档地址http://crunch.apache.org/user-guide.html
首先是学习了一下getstart
然后才是user-guide
简述一下吧,作为一个笔记
附上代码
package com.hit.crunch;
import java.util.Set;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.scrunch.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
String a[] = new String[]{"/zhonghui/input","/zhonghui/output"};
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
ToolRunner.run(conf, new WordCount(), a);
}
@Override
public int run(String[] args) throws Exception {
String inputPath = args[0];
String outPath = args[1];
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());
PCollection<String> noStopWords = words.filter(new StopWordFilter());
PTable<String, Long> counts = noStopWords.count();
pipeline.writeTextFile(counts, outPath);
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
}
class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but",
"by", "for", "if", "in", "into", "is", "it", "no", "not",
"of", "on", "or", "s", "such", "t", "that", "the", "their",
"then", "there", "these", "they", "this", "to", "was",
"will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}
class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line, Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>crunch</groupId>
<artifactId>crunch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.crunch/crunch-core -->
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-test</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-hbase</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-spark</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-examples</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>