Flink DataStream API 和 Table / SQL 都支持通过批处理执行模式处理有界输入。此模式是通过 blocking shuffle 进行网络传输。与流式应用使用管道 shuffle 阻塞交换的数据并存储,然后下游任务通过网络获取这些值的方式不同。这种交换减少了执行作业所需的资源,因为它不需要同时运行上游和下游任务。
总的来说,Flink 提供了两种不同类型的 blocking shuffles:Hash shuffle 和 Sort shuffle。
对于 1.14 以及更低的版本,Hash Shuffle 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。
Hash Shuffle 为读写文件提供了不同的机制:
可通过设置 TaskManager 参数 选择不同的机制。
这个选项是实验性的,将来或许会有改动。
如果开启 SSL,file 机制不能使用 FileRegion 而是在传输之前使用非池化的缓存去缓存数据。这可能会 导致 direct memory OOM。此外,因为同步读取文件有时会造成 netty 线程阻塞,SSL handshake timeout 配置需要调大以防 connection reset 异常。
mmap使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 YARN 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
Hash Shuffle 在小规模运行在固态硬盘的任务情况下效果显著,但是依旧有一些问题:
Sort Shuffle 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 Hash Shuffle,Sort Shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 Sort Shuffle 可以获得比 Hash Shuffle 更好的性能。另外,Sort Shuffle 使用额外管理的内存作为读数据缓存并不依赖 sendfile 或 mmap 机制,因此也适用于 SSL。关于 Sort Shuffle 的更多细节请参考 FLINK-19582 和 FLINK-19614。
当使用sort blocking shuffle的时候有些配置需要适配:
目前 Sort Shuffle 只通过分区索引来排序而不是记录本身,也就是说 sort 只是被当成数据聚类算法使用。
总的来说,
要在 Sort Shuffle 和 Hash Shuffle 间切换,你需要配置这个参数:taskmanager.network.sort-shuffle.min-parallelism。这个参数根据消费者Task的并发选择当前Task使用Hash Shuffle 或 Sort Shuffle,如果并发小于配置值则使用 Hash Shuffle,否则使用 Sort Shuffle。对于 1.15 以下版本,它的默认值是 Integer.MAX_VALUE,这意味着 Hash Shuffle 是默认实现。从 1.15 起,它的默认值是 1,这意味着 Sort Shuffle 是默认实现。
下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
尽管十分罕见,下面列举了一些你可能会碰到的异常情况以及对应的处理策略:
异常情况 | 处理策略 |
---|---|
Insufficient number of network buffers | 这意味着网络内存大小不足以支撑作业运行,你需要增加总的网络内存大小。注意:从 1.15 开始,Sort Shuffle 已经成为默认实现,对于一些场景,Sort Shuffle 可能比 Hash Shuffle 需要更多的网络内存,因此当你的批作业升级到 1.15 以后可能会遇到这个网络内存不足的问题。这种情况下,你只需要增大总的网络内存大小即可。 |
Too many open files | 这意味着文件句柄不够用了。如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle。如果你已经在使用 Sort Shuffle,请考虑增大操作系统文件句柄上限并且检查是否是作业代码占用了过多的文件句柄。 |
Connection reset by peer | 这通常意味着网络不太稳定或者压力较大。其他一些原因,比如上面提到的 SSL 握手超时等也可能会导致这一问题。如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle。如果你已经在使用 Sort Shuffle,增大 网络连接 backlog 可能会有所帮助。 |
Network connection timeout | 这通常意味着网络不太稳定或者压力较大。增大 网络连接超时时间 或者开启 网络连接重试 可能会有所帮助。 |
Socket read/write timeout | 这通常意味着网络传输速度较慢或者压力较大。增大 网络收发缓冲区 大小可能会有所帮助。如果作业运行在 Kubernetes 环境,使用 host network 可能会有所帮助。 |
Read buffer request timeout | 这个问题只会出现在 Sort Shuffle,它意味着对数据读取缓冲区的激烈竞争。要解决这一问题,你可以增大 taskmanager.memory.framework.off-heap.batch-shuffle.size 和 taskmanager.memory.framework.off-heap.size。 |
No space left on device | 这通常意味着磁盘存储空间或者 inodes 被耗尽。你可以考虑扩展磁盘存储空间或者做一些数据清理。 |
Out of memory error | 如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle。如果你已经在使用 Sort Shuffle 并且遵循了上面章节的建议,你可以考虑增大相应的内存大小。对于堆上内存,你可以增大 taskmanager.memory.task.heap.size,对于直接内存,你可以增大 taskmanager.memory.task.off-heap.size。 |
Container killed by external resource manger | 多种原因可能会导致容器被杀,比如,杀掉一个低优先级容器以释放资源启动高优先级容器,或者容器占用了过多的资源,比如内存、磁盘空间等。像上面章节所提到的那样,Hash Shuffle 可能会使用过多的内存而被 YARN 杀掉。所以,如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle。如果你已经在使用 Sort Shuffle,你可能需要同时检查 Flink 日志以及资源管理框架的日志以找出容器被杀的根因,并且做出相应的修复。 |