flink 对java8 lambdas表达式的支持

尉迟正奇
2023-12-01

Map: 对于简单的map(i -> i * i) flink 可以猜测其 类型。复杂的则需要指定return type,或者构造一个MapFunction,或者extends 自 Tuple2<Integer, Integer>。
flatMap:对于flatMap 的支持是无法猜测出来 类型的,必须通过returns(Types.STRING) 指定具体的返回值类型。

package myflink.learn.lambda;
/**
 * Created by:
 * date: 2019-02-17.
 */

import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

// https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/java_lambdas.html#java-lambda-expressions
@Slf4j
@ToString
public class LambdaDemo {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.fromElements(1, 2, 3).map(i -> i * i).print();
        DataSet<Integer> input = env.fromElements(1, 2, 3);

        input.flatMap((Integer number, Collector<String> out) -> {
            StringBuilder builder = new StringBuilder();
            for (int i = 0; i < number; i++) {
                builder.append("a");
                out.collect(builder.toString());
            }
        })
                .returns(Types.STRING)
                .print();

//        env.fromElements(1, 2, 3)
//                .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
//                .print(); // Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
//        // In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
//        // An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface.
//        // Otherwise the type has to be specified explicitly using type information.

        env.fromElements(1, 2, 3)
                .map(i -> Tuple2.of(i, i))
                .returns(Types.TUPLE(Types.INT, Types.INT))
                .print();


        // 或者 使用一个Mapper
        env.fromElements(1, 2, 3)
                .map(new MyTuple2Mapper())
                .print();


        // 或者 使用一个 匿名类
        env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Integer value) throws Exception {
                return Tuple2.of(value, value);
            }
        }).print();

        // or in this example use a tuple subclass instead
        env.fromElements(1, 2, 3)
                .map(i -> new DoubleTuple(i, i))
                .print();

    }

    public static class DoubleTuple extends Tuple2<Integer, Integer> {
        // 必须加上 无参构造函数,newInstance 时需要
        /*
        Caused by: java.lang.NoSuchMethodException: myflink.learn.lambda.LambdaDemo$DoubleTuple.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.newInstance(Class.java:412)
         */
        public DoubleTuple() {

        }

        public DoubleTuple(int f0, int f1) {
            this.f0 = f0;
            this.f1 = f1;
        }
    }
}

 

以上就是本文的全部内容,希望对大家的学习有所帮助

硬核资料:关注即私信或(点击获取)可领取行业经典书籍PDF。
技术互助:技术群大佬指点迷津,你的问题可能不是问题,求资源在(技术群)里喊一声。
面试题库:由P8大佬们共同投稿,热乎的大厂面试真题,持续更新中。(点击获取
知识体系:含编程语言、算法、大数据生态圈组件(Mysql、Hive、Spark、Flink)、数据仓库 
 

 类似资料: