调用用户定义的MapFunction对DataStream数据进行处理,形成新的DataStream,其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
Map[DataStream -> DataStream]
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
处理输入一个元素产生一个或者多个元素的计算场景。
FlatMap[DataStream -> DataStream]
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
处理输入一个元素产生一个或者多个元素的计算场景。
Filter[DataStream -> DataStream]
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
将数据集中相同的Key值的数据放置在相同的分区中,也就是对数据集执行Partition操作
KeyBy[DataStream -> KeyedStream
注意:两种情况不能使用KeyBy方法对数据集进行重新分区
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
对数据集滚动进行聚合处理,其中定义的ReduceFuction必须满足运算结合律和交换律。
Reduce[KeyedStream -> DataStream]
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
Aggregations是DataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。
Aggregations[KeyedStream -> DataStream]
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
以上Transform我统一写一个wordcount:
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/11 1:22 下午
*/
public class Transform {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStreamSource = env.socketTextStream("bigdata-pro-m07",9999);
/**
* map()
*/
DataStream<Tuple2<String,String>> mapResult = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String line) throws Exception {
String[] word = line.split(" ");
return new Tuple2<>(word[0],word[1]);
}
});
/**
* flatmap()
*/
DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* filter() -> keyBy() -> reduce()
*/
DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
String saprk = "spark";
return !stringIntegerTuple2.f0.equals(saprk);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
}
});
result.print();
env.execute("stream");
}
}
Union主要是将两个或者两个以上的数据集合并成一个数据集,需要保证两个数据集的格式一致。
Union[DataStream -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/11 1:22 下午
*/
public class UnionJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStreamSource1 = env.socketTextStream("bigdata-pro-m07",9999);
DataStream<String> dataStreamSource2 = env.socketTextStream("bigdata-pro-m07",9998);
DataStream<String> dataStreamSource = dataStreamSource1.union(dataStreamSource2);
/**
* flatmap()
*/
DataStream<Tuple2<String, Integer>> flatmapResult = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* filter() -> keyBy() -> reduce()
*/
DataStream<Tuple2<String,Integer>> result = flatmapResult.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
String saprk = "spark";
return !stringIntegerTuple2.f0.equals(saprk);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
}
});
result.print();
env.execute("stream");
}
}
Connnect主要是为了合并两种或者多种不同数据类型的数据集,合并会保留原来的数据集的数据类型。
Connect[KeyedStream -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/11 1:22 下午
*/
public class ConnectJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStreamSource1 = env.fromElements(new Tuple2<>("spark",1),new Tuple2<>("java",3));
DataStream<Tuple2<String,Integer>> dataStreamSource2 = env.fromElements(new Tuple2<>("hive",2),new Tuple2<>("hadoop",5));
/**
* connect()
*/
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = dataStreamSource1
.connect(dataStreamSource2).keyBy(0,0);
DataStream<Tuple2<String,Integer>> mapResult = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map1(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + 10);
}
@Override
public Tuple2<String, Integer> map2(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2;
}
});
mapResult.print();
env.execute("stream");
}
}
Side Outs提供了根据条件对DataStream数据集进行拆分,原来是Split算子可以提供这个功能,但在Flink的后续版本中已经不推荐使用Split算子了。
SideOut[DataStream -> SingleOutputStreamOperator -> DataStream]
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/11 1:22 下午
*/
public class SideOut {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("side-output"){};
DataStream<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
new Tuple2<>("alex",11000),
new Tuple2<>("lili",3200),
new Tuple2<>("lucy",3400),
new Tuple2<>("pony",13000),
new Tuple2<>("tony",33000),
new Tuple2<>("herry",4500),
new Tuple2<>("cherry",9000),
new Tuple2<>("jack",13450)
);
/**
* mainDataStream为拆分出薪资小于10000的数据集
*/
SingleOutputStreamOperator<Tuple2<String,Integer>> mainDataStream = dataStreamSource
.process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void processElement(Tuple2<String, Integer> stringIntegerTuple2,
Context context,
Collector<Tuple2<String, Integer>> collector) throws Exception {
if (stringIntegerTuple2.f1 > 10000){
context.output(outputTag,stringIntegerTuple2);
} else {
collector.collect(stringIntegerTuple2);
}
}
});
/**
* sideOutputStream为拆分出薪资大于10000的数据集
*/
DataStream<Tuple2<String,Integer>> sideOutputStream = mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
/**
* 6> (tony,33000)
* 5> (pony,13000)
* 2> (alex,11000)
* 9> (jack,13450)
*/
mainDataStream.print();
/**
* 13> (herry,4500)
* 10> (lucy,3400)
* 9> (lili,3200)
* 14> (cherry,9000)
*/
env.execute("stream");
}
}
Iterate算子适合迭代计算场景,通过每一次的迭代计算,并将计算结果反馈到下一次迭代计算中。
SideOut[DataStream -> IterativeStream -> DataStream]
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
自定义分区:
通过实现partitonCustom()方法对数据集创建自定义分区。
自定义分区[DataStream -> DataStream]
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
package com.aikfk.flink.datastream.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/12 7:38 下午
*/
public class PartitionCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* 自定义分区
*/
dataStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
int partition = key.hashCode() % numPartitions;
System.out.println("key: " + key + " partition: " + partition + " numPartitions: " + numPartitions);
return partition;
}
}, new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
});
env.execute("partition");
}
}
key: java partition: 2 numPartitions: 16
key: java partition: 2 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hive partition: 0 numPartitions: 16
key: hbase partition: 9 numPartitions: 16
Random分区:
通过随机的方式对数据集进行分区,分区相对比较平衡。
Random分区[DataStream -> DataStream]
dataStream.shuffle();
Rebalance分区:
通过轮训的方式对数据集进行分区,分区相对比较平衡。
Rebalance分区[DataStream -> DataStream]
dataStream.rebalance();
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!