当前位置: 首页 > 工具软件 > Apache Crunch > 使用案例 >

crunch学习一

杨君之
2023-12-01

最近在学习crunch

先附上官网文档地址http://crunch.apache.org/user-guide.html

首先是学习了一下getstart

然后才是user-guide

简述一下吧,作为一个笔记

1.crunch几个重要的接口PCollection, PTable, and PGroupedTable.Pipeline
pipeline是一个类似与管道的东西,是所有的入口,实现类有

- MRPipeline: Executes the pipeline as a series of MapReduce jobs.
- MemPipeline: Executes the pipeline in-memory on the client.
- SparkPipeline: Executes the pipeline by converting it to a series of Spark pipelines
pipeline.read 可以产生pcollection
pcollection可一调用一个parallelDo方法 传入自己实现dofn接口 和 返回值的类型
dofn接口是类似与map 或者reduce的接口,代码逻辑就是写在这块
2.dofn比map reduce更加优秀的地方有两点
     一.map 如果需要一个常量,只能将这个常量放置在configuration中,然后调用step方法进行获取,然后转换.而dofn不需要,因为是new dofn()所以可以用构造子进行传递产量,而且所有的dofn都是实现了序列化接口,所以dofn中的所有东西都是不需要转换
     二.内置各种函数,如by group count等等(这点是我总结的,不是user-guide的)
3.dofn详解
  •      dofn的运行过程,首先是从TaskInputOutputContext中获取到输入参数,然后通过dofn中的initablize方法进行初始化,之后就是调用process方法进行逻辑的处理并且通过Emitter<T>进行输出结果,最后有一个cleanup的方法
  • dofn运行时可以通过getConfiguration() progress()setStatus(String status) 当然也有计数器increment(String groupName, String counterName)
  • 如何配置dnfn的执行计划 通过scaleFactor方法可以影响到每个dofn的输入具体是多少,这个方法的返回值是float类型,具体意思应该是用于控制map和reduce的个数,具体的我还没有做测试,这个只是猜测. configure(Configuration conf)方法是用于一些设置
  • dofn的一些常用内容
4.pcollection 中有filterfn 用于过滤 也可以使用内置的filterfns的and or之类的进行过滤
  •      mapfn用于原样输出 
  •      使用pcollection中的by方法 传入mapfn<k,v> 可以将pcollection转换成为kv的形式,mapfn的作用是生成key, 值是之前pcollection中的值
  • 使用上述的by方法生成的是ptable,ptable有mapkey和mapvalue方法,这个是对ptable中的key value进行做转换

附上代码

package com.hit.crunch;


import java.util.Set;


import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.scrunch.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;


public class WordCount extends Configured implements Tool {


public static void main(String[] args) throws Exception {
String a[] = new String[]{"/zhonghui/input","/zhonghui/output"};
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
ToolRunner.run(conf, new WordCount(), a);
}


@Override
public int run(String[] args) throws Exception {
String inputPath = args[0];
String outPath = args[1];
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());


PCollection<String> lines = pipeline.readTextFile(inputPath);


PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());


PCollection<String> noStopWords = words.filter(new StopWordFilter());


PTable<String, Long> counts = noStopWords.count();


pipeline.writeTextFile(counts, outPath);


PipelineResult result = pipeline.done();


return result.succeeded() ? 0 : 1;
}


}


class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but",
"by", "for", "if", "in", "into", "is", "it", "no", "not",
"of", "on", "or", "s", "such", "t", "that", "the", "their",
"then", "there", "these", "they", "this", "to", "was",
"will", "with" });


@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}


class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter.onPattern("\\s+")
.omitEmptyStrings();


@Override
public void process(String line, Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}


pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>crunch</groupId>
<artifactId>crunch</artifactId>
<version>0.0.1-SNAPSHOT</version>


<dependencies>


<!-- https://mvnrepository.com/artifact/org.apache.crunch/crunch-core -->
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>0.14.0</version>
</dependency>


<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-test</artifactId>
<version>0.14.0</version>
</dependency>


<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-hbase</artifactId>
<version>0.14.0</version>
</dependency>




<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-spark</artifactId>
<version>0.14.0</version>
</dependency>


<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-examples</artifactId>
<version>0.14.0</version>
</dependency>

<dependency>
        <groupId>jdk.tools</groupId>
        <artifactId>jdk.tools</artifactId>
        <version>1.6</version>
        <scope>system</scope>
        <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    </dependency>


</dependencies>


<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

 类似资料: