Flink Partitioning,不只是Shuffle

傅乐湛
2023-12-01

概念

Actually,在Flink中Shuffle的含义是很局限,并不似Spark中那么宽泛。在Flink中,数据从UpStream到DownStream的过程中,涉及到数据发往哪一个Subtask的问题。这个过程在Flink中统称为partitioning。概念是不是 一目了然,是不是比Spark的Shuffle好懂?

分类

partitioning操作具体有如下几种:

操作操作方式
Shuffle随机选择发往下游的channel
Rebalanceround-robin
Rescale根据上下游Operator的数量,做一个均匀分配
Broadcast广播

还有一个,在Flink documentation中可能没有说,但实际上很关键,它就是Keyby操作。从源码看,在org.apache.flink.streaming.runtime.partitioner的包中,我们能看到以上提到的所有。

ChannelSelect

从另一个角度来说partitioning,我们也可以理解它是一个选择将Record发往哪一个channel的过程。从StreamPartitioner类implement ChannelSelector也可以印证这一点。好,我们来着重看看keyGroupStreamPartitioner,它对应了我们最常用的keyby操作。

keyGroupStreamPartitioner

直接撸代码,来看看Keyby操作是如何选择Channel发送数据的。插一句,Channel的建立,基本上可以理解上下游的subTask之间会建立一个Mesh。

//From keyGroupStreamPartitioner
@Override
public int[] selectChannels(
	SerializationDelegate<StreamRecord<T>> record,
	int numberOfOutputChannels) {

	K key;
	try {
		key = keySelector.getKey(record.getInstance().getValue());
	} catch (Exception e) {
		throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
	}
	returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
	return returnArray;

//-------segment
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
	}
}

//-------segment
public static int assignToKeyGroup(Object key, int maxParallelism) {
		return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
	}

//-------segment
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
	return MathUtils.murmurHash(keyHash) % maxParallelism;
}

//-------segment
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
		return keyGroupId * parallelism / maxParallelism;
	}

一言以蔽之,靠key值hash决定发送的Channel。但是细节上做了很多,比如用murmurHash计算Hash值,以保证尽量的散列。不说了,全在code里了。

结束语

Partitioning是不是很简单?再也不会对keyby困惑了吧。这里其实暗含了Flink的数据处理模型,是完全基于网络IO来做的。

Keyby最令人头疼的是数据倾斜的问题,我被很多人问过如何解决数据倾斜(BOSS视角问的那种)。我只想说,具体问题具体看吧,把重计算分而治之,再合并计算结果呗。死活分不开还要解决倾斜,来,大佬,留下你的解决方案。

 类似资料: