在写Flink程序的时候(以最简单的WordCount案例为例),有时会使用Lambda表达式来简化,如下边程序中的flatMap算子和Map算子处,都是用了Lambda表达式来简写:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataSource = env.socketTextStream("10.12.36.102", 8888);
//flatMap,lambda形式简写
SingleOutputStreamOperator<String> wordStream = dataSource.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect));
//map,lambda形式简写
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordStream.map(word -> Tuple2.of(word, 1));
//分组求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = wordAndOne.keyBy(tp -> tp.f0).sum(1);
sumStream.print();
env.execute();
}
但是简化成Lambda形式后,出现了如下错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException:
The return type of function 'main(StreamingWordCount2.java:31)' could not be determined automatically, due to type erasure.
You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
The generic type parameters of 'Tuple2' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
其实在报出的错误中表明的很清楚了:
在许多情况下,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息。也就是说由于类型擦除,无法自动确定。
You can give type information hints by using the returns(...) method on the result of the transformation call,这句话就是告诉我们可以在转换的算子之后调用returns(...)方法来显示指明要返回的数据类型信息。
具体在代码中如下所示:
//flatMap(见本行最后)
SingleOutputStreamOperator<String> wordStream = dataSource.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING);
//map(见本行最后)
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordStream.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
如代码所示,
在flatMap算子之后,调用returns方法来显示指定返回类型为Types.STRING类型;
在map算子之后,调用returns方法来显示指定返回类型为Types.TUPLE元组类型,并且元组中的第一个元素是STRING类型,第二个元素是INT类型。
这样就可以解决问题啦~
我是smallk,自学大数据,拿到百度、京东、小米、顺丰、58、哈罗、海康等22家大数据offer,欢迎仍在数据路上的小伙伴,我们一起讨论前行。