这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
本篇我们将使用Java语言来实现Flink的单词统计。
代码开发
环境准备
导入Flink 1.9 pom依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies>
构建Flink流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定义source
每秒生成一行文本
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒发送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } });
单词计算
// 3. 单词统计 // 3.1 将文本行切分成一个个的单词 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 将单词转换为一个个的元组 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照单词进行分组 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 对每组单词数量进行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print();
参考代码
public class WordCount { public static void main(String[] args) throws Exception { // 1. 构建Flink流式初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 自定义source - 每秒发送一行文本 DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒发送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } }); // 3. 单词统计 // 3.1 将文本行切分成一个个的单词 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 将单词转换为一个个的元组 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照单词进行分组 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 对每组单词数量进行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print(); env.execute("app"); } }
Flink对Java Lambda表达式支持情况
Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。
我们来看下上述的这段代码:
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING);
之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { /** * The core method of the FlatMapFunction. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @param value The input value. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void flatMap(T value, Collector<O> out) throws Exception; }
我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:
void flatMap(T value, Collector out)
这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。
所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息
我们再看一段代码:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT));
为什么map后面也需要指定类型呢?
因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。
更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
我有下面的类和对象的ArrayList: 我想要的是从我的列表中得到一个字符串和整数地图,其中包含门票中的目的地和我列表中每个目的地的出现次数。我想它会这样开始,但是我不知道如何继续:
本文向大家介绍Java8 Lambda表达式模板方法实现解析,包括了Java8 Lambda表达式模板方法实现解析的使用技巧和注意事项,需要的朋友参考一下 Java注解提供了关于代码的一些信息,但并不直接作用于它所注解的代码内容。在这个教程当中,我们将学习Java的注解,如何定制注解,注解的使用以及如何通过反射解析注解。 Java1.5引入了注解,当前许多java框架中大量使用注解,如Hibern
问题内容: 我正在创建一个CSS编辑器,并试图创建一个可以从CSS文档获取数据的正则表达式。如果我拥有一个属性,则此正则表达式有效,但我无法使其对所有属性都有效。我在PHP中使用preg / perl语法。 正则表达式 测试用例 预期结果 实际结果 在此先感谢您的帮助-整个下午使我感到困惑! 问题答案: 对于单个正则表达式来说,这似乎太令人费解了。好吧,我敢肯定,通过正确的扩展,高级用户可以创建正
我正在寻找一个JAVA库来解析 我的要求: 支持所有的值类型(例如int,双,布尔,String等) 支持所有已知的数学 有什么建议吗?
过滤表达式 mitmproxy工具中的许多命令都使用过滤器表达式。过滤器表达式由以下运算符组成: 命令 描述 〜a 匹配响应资源:CSS,Javascript,Flash,images。 〜b regex Body 〜bq regex 请求的Body 〜bs regex 响应的Body 〜c int HTTP响应码 〜d regex 域名 〜dst regex 匹配目标地址 〜e 匹配错误 〜h
从JLS中考虑下面的文章,它描述了在类型推断过程中方法引用表达式的还原过程。 形式的约束公式,其中T提到至少一个推理变量,减少如下: ... 否则,如果方法参考准确(§15.13.1),则让P1。。。,Pn是T的函数类型的参数类型,让F1。。。,Fk是潜在适用方法的参数类型。该约束减少为一组新的约束,如下所示: –在n=K1的特殊情况下,P1类型的参数将作为调用的目标引用。方法引用表达式的形式必须