Actually,在Flink中Shuffle的含义是很局限,并不似Spark中那么宽泛。在Flink中,数据从UpStream到DownStream的过程中,涉及到数据发往哪一个Subtask的问题。这个过程在Flink中统称为partitioning。概念是不是 一目了然,是不是比Spark的Shuffle好懂?
partitioning操作具体有如下几种:
操作 | 操作方式 |
---|---|
Shuffle | 随机选择发往下游的channel |
Rebalance | round-robin |
Rescale | 根据上下游Operator的数量,做一个均匀分配 |
Broadcast | 广播 |
还有一个,在Flink documentation中可能没有说,但实际上很关键,它就是Keyby操作。从源码看,在org.apache.flink.streaming.runtime.partitioner的包中,我们能看到以上提到的所有。
从另一个角度来说partitioning,我们也可以理解它是一个选择将Record发往哪一个channel的过程。从StreamPartitioner类implement ChannelSelector也可以印证这一点。好,我们来着重看看keyGroupStreamPartitioner,它对应了我们最常用的keyby操作。
直接撸代码,来看看Keyby操作是如何选择Channel发送数据的。插一句,Channel的建立,基本上可以理解上下游的subTask之间会建立一个Mesh。
//From keyGroupStreamPartitioner
@Override
public int[] selectChannels(
SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
return returnArray;
//-------segment
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
}
//-------segment
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
//-------segment
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
//-------segment
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
一言以蔽之,靠key值hash决定发送的Channel。但是细节上做了很多,比如用murmurHash计算Hash值,以保证尽量的散列。不说了,全在code里了。
Partitioning是不是很简单?再也不会对keyby困惑了吧。这里其实暗含了Flink的数据处理模型,是完全基于网络IO来做的。
Keyby最令人头疼的是数据倾斜的问题,我被很多人问过如何解决数据倾斜(BOSS视角问的那种)。我只想说,具体问题具体看吧,把重计算分而治之,再合并计算结果呗。死活分不开还要解决倾斜,来,大佬,留下你的解决方案。