咱们不多废话,先直接来如何简单使用beam框架。
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:
首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0
接着我们新建1个HowToCreateAndShowData类,然后开始例子
任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道
// 建立选项
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
// 建立管道
Pipeline pipeline = Pipeline.create(pipelineOptions);
关于选项option和pipeline的更多用法,后面的章节会继续介绍
我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:
// 生成初始的输入数据
// 相当于往管道里塞入了3个自己写的字符串元素
PCollection<String> pcStart = pipeline.apply(
Create.of(
"HELLO!",
"THIS IS BEAM DEMO!",
"HAPPY STUDY!"));
我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection的对象,我们称之为数据集。
<String>指的是数据集中数据的类型
要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:
// 把字符串转成小写的转换方法类
// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
static class StrToLowerCaseFn extends DoFn<String, String> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
// 从管道中取出的1个元素
String inputStr = context.element();
// 转成大写
String outputStr = inputStr.toLowerCase();
// 输出结果
context.output(outputStr);
}
}
接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中
// 组装小写转换
PCollection<String> pcMid =
pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台
// 打印结果方法类
// 因为不需要再往下输出,所以
static class PrintStrFn extends DoFn<String, Void> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
// 从管道中取出的1个元素
String inputStr = context.element();
// 输出
System.out.println(inputStr);
}
}
然后组装
// 组装输出操作
pcMid.apply(ParDo.of(new PrintStrFn()));
刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:
// 运行结果
pipeline.run().waitUntilFinish();
执行main方法,输出如下结果:
image.png
/**
* The howToCreateAndShowData
*
* */
public class HowToCreateAndShowData {
public static void main(String[] args) {
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(pipelineOptions);
// 生成初始的输入数据
// 相当于往管道里塞入了3个自己写的字符串元素
PCollection<String> pcStart = pipeline.apply(
Create.of(
"HELLO!",
"THIS IS BEAM DEMO!",
"HAPPY STUDY!"));
// 组装小写转换
PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
// 组装输出操作
pcMid.apply(ParDo.of(new PrintStrFn()));
// 运行结果
pipeline.run().waitUntilFinish();
}
// 把字符串转成小写的转换方法类
// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
static class StrToLowerCaseFn extends DoFn<String, String> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
*
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
// 从管道中取出的1个元素
String inputStr = context.element();
// 转成大写
String outputStr = inputStr.toLowerCase();
// 输出结果
context.output(outputStr);
}
}
// 打印结果方法类
// 因为不需要再往下输出,所以
static class PrintStrFn extends DoFn<String, Void> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
*
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
// 从管道中取出的1个元素
String inputStr = context.element();
// 输出
System.out.println(inputStr);
}
}
}