Flink之DataStream转换操作

徐隐水
2023-12-01

(1)Map详解

调用用户定义的MapFunction对DataStream数据进行处理,形成新的DataStream,其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。

Map[DataStream -> DataStream]
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

(2)FlatMap详解

处理输入一个元素产生一个或者多个元素的计算场景。

FlatMap[DataStream -> DataStream]
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

(3)Filter详解

处理输入一个元素产生一个或者多个元素的计算场景。

Filter[DataStream -> DataStream]
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

(4)KeyBy详解

将数据集中相同的Key值的数据放置在相同的分区中,也就是对数据集执行Partition操作

KeyBy[DataStream -> KeyedStream

注意:两种情况不能使用KeyBy方法对数据集进行重新分区

  • 数据集类型为P0J0s类型
  • 数据集类型为数组结构
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

(5)Reduce详解

对数据集滚动进行聚合处理,其中定义的ReduceFuction必须满足运算结合律和交换律。

Reduce[KeyedStream -> DataStream]
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

(6)Aggregations详解

Aggregations是DataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。

Aggregations[KeyedStream -> DataStream]
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

以上Transform我统一写一个wordcount:

package com.aikfk.flink.datastream.transform;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class Transform {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> dataStreamSource = env.socketTextStream("bigdata-pro-m07",9999);

        /**
         * map()
         */
        DataStream<Tuple2<String,String>> mapResult = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String line) throws Exception {
                String[] word = line.split(" ");
                return new Tuple2<>(word[0],word[1]);
            }
        });

        /**
         * flatmap()
         */
        DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });

        /**
         * filter() -> keyBy() -> reduce()
         */
        DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                String saprk = "spark";
                return !stringIntegerTuple2.f0.equals(saprk);
            }
        })
            .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        })
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
                }
            });

        result.print();

        env.execute("stream");
    }
}

(7)Union详解

Union主要是将两个或者两个以上的数据集合并成一个数据集,需要保证两个数据集的格式一致。

Union[DataStream -> DataStream]
package com.aikfk.flink.datastream.transform;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class UnionJava {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> dataStreamSource1 = env.socketTextStream("bigdata-pro-m07",9999);

        DataStream<String> dataStreamSource2 = env.socketTextStream("bigdata-pro-m07",9998);

        DataStream<String> dataStreamSource = dataStreamSource1.union(dataStreamSource2);


        /**
         * flatmap()
         */
        DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        });

        /**
         * filter() -> keyBy() -> reduce()
         */
        DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                String saprk = "spark";
                return !stringIntegerTuple2.f0.equals(saprk);
            }
        })
            .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        })
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
                }
            });

        result.print();

        env.execute("stream");
    }
}

(8)Connect详解

Connnect主要是为了合并两种或者多种不同数据类型的数据集,合并会保留原来的数据集的数据类型。

Connect[KeyedStream -> DataStream]
package com.aikfk.flink.datastream.transform;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class ConnectJava {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String,Integer>> dataStreamSource1 = env.fromElements(new Tuple2<>("spark",1),new Tuple2<>("java",3));
        DataStream<Tuple2<String,Integer>> dataStreamSource2 = env.fromElements(new Tuple2<>("hive",2),new Tuple2<>("hadoop",5));

        /**
         * connect()
         */
        ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = dataStreamSource1
                .connect(dataStreamSource2).keyBy(0,0);

        DataStream<Tuple2<String,Integer>> mapResult = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map1(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<>(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + 10);
            }

            @Override
            public Tuple2<String, Integer> map2(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2;
            }
        });

        mapResult.print();


        env.execute("stream");
    }
}

(9)Side Out详解

Side Outs提供了根据条件对DataStream数据集进行拆分,原来是Split算子可以提供这个功能,但在Flink的后续版本中已经不推荐使用Split算子了。

SideOut[DataStream -> SingleOutputStreamOperator -> DataStream]
package com.aikfk.flink.datastream.transform;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/11 1:22 下午
 */
public class SideOut {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("side-output"){};

        DataStream<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
                new Tuple2<>("alex",11000),
                new Tuple2<>("lili",3200),
                new Tuple2<>("lucy",3400),
                new Tuple2<>("pony",13000),
                new Tuple2<>("tony",33000),
                new Tuple2<>("herry",4500),
                new Tuple2<>("cherry",9000),
                new Tuple2<>("jack",13450)
        );

        /**
         * mainDataStream为拆分出薪资小于10000的数据集
         */
        SingleOutputStreamOperator<Tuple2<String,Integer>> mainDataStream = dataStreamSource
                .process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void processElement(Tuple2<String, Integer> stringIntegerTuple2,
                                       Context context,
                                       Collector<Tuple2<String, Integer>> collector) throws Exception {

                if (stringIntegerTuple2.f1 > 10000){
                    context.output(outputTag,stringIntegerTuple2);
                } else {
                    collector.collect(stringIntegerTuple2);
                }

            }
        });

        /**
         * sideOutputStream为拆分出薪资大于10000的数据集
         */
        DataStream<Tuple2<String,Integer>> sideOutputStream = mainDataStream.getSideOutput(outputTag);

        sideOutputStream.print();

        /**
         * 6> (tony,33000)
         * 5> (pony,13000)
         * 2> (alex,11000)
         * 9> (jack,13450)
         */

        mainDataStream.print();
        /**
         * 13> (herry,4500)
         * 10> (lucy,3400)
         * 9> (lili,3200)
         * 14> (cherry,9000)
         */

        env.execute("stream");
    }
}

(10)Iterate详解

Iterate算子适合迭代计算场景,通过每一次的迭代计算,并将计算结果反馈到下一次迭代计算中。

SideOut[DataStream -> IterativeStream -> DataStream]
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

(11)分区详解

自定义分区:
通过实现partitonCustom()方法对数据集创建自定义分区。

自定义分区[DataStream -> DataStream]
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
package com.aikfk.flink.datastream.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/12 7:38 下午
 */
public class PartitionCustom {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        for (String word : s.split(" ")){
                            collector.collect(new Tuple2<>(word,1));
                        }
                    }
                });

        /**
         * 自定义分区
         */
        dataStream.partitionCustom(new Partitioner<String>() {
            @Override
            public int partition(String key, int numPartitions) {

                int partition = key.hashCode() % numPartitions;

                System.out.println("key: " + key + " partition: " + partition + " numPartitions: " + numPartitions);

                return partition;
            }
        }, new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });

        env.execute("partition");
        
    }
}

key: java partition: 2 numPartitions: 16
key: java partition: 2 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hbase partition: 9 numPartitions: 16

Random分区:
通过随机的方式对数据集进行分区,分区相对比较平衡。

Random分区[DataStream -> DataStream]
dataStream.shuffle();

Rebalance分区:
通过轮训的方式对数据集进行分区,分区相对比较平衡。

Rebalance分区[DataStream -> DataStream]
dataStream.rebalance();

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

 类似资料: