当前位置: 首页 > 工具软件 > Apache Beam > 使用案例 >

apache beam入门之初次使用

邓深
2023-12-01

beam入门宝典之初次使用

咱们不多废话,先直接来如何简单使用beam框架。
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:

  1. 如何建立管道
  2. 如何手动生成数据
  3. 如何转换
  4. 如何查看输出

首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:

<dependency>
	<groupId>org.apache.beam</groupId>
	<artifactId>beam-runners-direct-java</artifactId>
	<version>${beam.version}</version>
</dependency>

beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0

接着我们新建1个HowToCreateAndShowData类,然后开始例子

建立管道

任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道

// 建立选项
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
// 建立管道
Pipeline pipeline = Pipeline.create(pipelineOptions);

关于选项option和pipeline的更多用法,后面的章节会继续介绍

手动生成数据

我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:

// 生成初始的输入数据
// 相当于往管道里塞入了3个自己写的字符串元素
PCollection<String> pcStart = pipeline.apply(
Create.of(
"HELLO!",
"THIS IS BEAM DEMO!",
"HAPPY STUDY!"));

我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection的对象,我们称之为数据集。
<String>指的是数据集中数据的类型

如何转换

要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:

// 把字符串转成小写的转换方法类
// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
static class StrToLowerCaseFn extends DoFn<String, String> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
	// 从管道中取出的1个元素
	String inputStr = context.element();
	// 转成大写
	String outputStr = inputStr.toLowerCase();
	// 输出结果
	context.output(outputStr);
	}
}

接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中

// 组装小写转换
PCollection<String> pcMid =
pcStart.apply(ParDo.of(new StrToLowerCaseFn())); 

如何输出

输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台

// 打印结果方法类
// 因为不需要再往下输出,所以
static class PrintStrFn extends DoFn<String, Void> {
/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {
	// 从管道中取出的1个元素
	String inputStr = context.element();

	// 输出
	System.out.println(inputStr);
	}
}

然后组装

// 组装输出操作
pcMid.apply(ParDo.of(new PrintStrFn()));

运行

刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:

// 运行结果
pipeline.run().waitUntilFinish();

执行main方法,输出如下结果:
image.png

完整代码

/**
 * The howToCreateAndShowData
 *
 * */
public class HowToCreateAndShowData {
    public static void main(String[] args) {
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(pipelineOptions);

        // 生成初始的输入数据
        // 相当于往管道里塞入了3个自己写的字符串元素
        PCollection<String> pcStart = pipeline.apply(
                Create.of(
                        "HELLO!",
                        "THIS IS BEAM DEMO!",
                        "HAPPY STUDY!"));

        // 组装小写转换
        PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));

        // 组装输出操作
        pcMid.apply(ParDo.of(new PrintStrFn()));

        // 运行结果
        pipeline.run().waitUntilFinish();
    }

    // 把字符串转成小写的转换方法类
    // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
    static class StrToLowerCaseFn extends DoFn<String, String> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         *
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 转成大写
            String outputStr = inputStr.toLowerCase();
             // 输出结果
            context.output(outputStr);
        }
    }

    // 打印结果方法类
    // 因为不需要再往下输出,所以
    static class PrintStrFn extends DoFn<String, Void> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         *
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
        // 从管道中取出的1个元素
            String inputStr = context.element();

        // 输出
            System.out.println(inputStr);
        }
    }
}
 类似资料: