分享

重磅:Flink 1.14.0 内存优化汇总(二)

问题导读:
1、Flink 内存数据结构如何理解?
2、网络缓冲器(NetworkBuffer)是什么?
3、Flink 内存如何进行调优?
4、故障排除有哪些排查方向?


接上一篇:重磅:Flink 1.14.0 内存优化汇总(一)


3 Flink 内存数据结构

Flink 的内存管理和操作系统管理内存一样.将内存划分为内存段、内存页等结构。

3.1 Flink 内存段

内存段在 Flink 内部叫 MemorySegment,是 Flink 中最小的内存分配单元,默认大小 32KB。它既可以是堆上内存(Java 的 byte 数组),也可以是堆外内存(基于 Netty 的 DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
2021-10-12_190251.jpg


HeapMemorySegment:用来分配堆上内存 ;

HybridMemorySegment:用来分配堆外内存和堆上内存; 2017 年以后的版本实 际上只使用了 HybridMemorySegment。

通过一个案例介绍Flink在序列化和反序列化过程中如何使用 MemorySegment:


2021-10-12_190329.jpg

如上图所示,当创建一个Tuple 3 对象时,包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person对象包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,

(1)在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需要占用四个字节。

(2)Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。

3.2 Flink 内存页

内存页是 MemorySegment 之上的数据访问视图,数据读取抽象为 DataInputView,数据写入抽象为 DataOutputView。使用时就无需关心 MemorySegment 的细节,该层会自动处理跨 MemorySegment 的读取和写入。

3.2.1 DataInputView

DataInputView 是从 MemorySegment 数据读取抽象视图,继承自 java.io.DataInput。InputView 中持有多个 MemorySegment 的引用(MemorySegment[]),这一组 MemorySegment 被视为一个内存页(Page),可以顺序读取 MemorySegment 中的数据。如下图为继承关系图:

2021-10-12_190405.jpg

3.2.2 DataInputView

DataOutputView 是从 MemorySegment 数据读取抽象视图,继承自java.io.DataOutput。OutputView 中持有多个 MemorySegment 的引用(MemorySegment[]),这一组 MemorySegment 被视为一个内存页(Page),可以顺序地向 MemorySegment 中写入数据。如下图为继承关系图:

2021-10-12_190436.jpg

3.2.3 Buffer

Buffer 是具有引用计数的 MemorySegment 实例的包装器。用来将上游 Task 算子处理完毕的结果交给下游时定义的一个抽象或者内存对象。

Buffer 的接口是网络层面上传输数据和事件的统一抽象,其实现类是 NetworkBuffer。Flink 在各个 TaskManger 之间传递数据时,使用的是这一层的抽象。1个 NetworkBuffer 中包装了一个 MemorySegment 。Buffer接口类图如下:

2021-10-12_190509.jpg

Buffer 的底层是 MemorySegment,Buffer 申请和释放由 Flink 自行管理,Flink 引入了引用数的概念。当有新的 Buffer 消费者时,引用数加 1,当消费者消费完 Buffer 时,引用数减 1,最终当引用数变为 0 时,就可以将 Buffer 释放重用了。

3.2.4 Buffer 资源池

Buffer 资源池在 Flink 中叫作 BufferPool。BufferPool 用来管理 Buffer,包含 Buffer 的申请、释放、销毁、可用 Buffer 的通知等,其实现类是 LocalBufferPool ,每个 Task 拥有自己的 LocalBufferPool 。

BufferPool 的类体系如下:

2021-10-12_190536.jpg

4 网络缓冲器(NetworkBuffer)

网络缓冲器 (NetworkBuffer) 是网络交换数据的包装,其对应于 MemorySegment 内存段。

Network Buffer,顾名思义,就是在网络传输中使用到的 Buffer(实际非网络传输也会用到)。了解 Flink 网络栈的同学应该会比较清楚,Flink 经过网络传输的上下游 Task 的设计会比较类似生产者 - 消费者模型。如果没有这个缓冲区,那么生产者或消费者会消耗大量时间在等待下游拿数据和上游发数据的环节上。加上这个缓冲区,生产者和消费者解耦开,任何一方短时间内的抖动理论上对另一方的数据处理都不会产生太大影响。如下图所示:

2021-10-12_190605.jpg

这是对于单进程内生产者-消费者模型的一个图示,事实上,如果两个 Task 在同一个 TaskManager 内,那么使用的就是上述模型,

对于不同 TM 内、或者需要跨网络传输的 TM 之间,利用生产者-消费者模型来进行数据传输的原理图如下:

2021-10-12_190631.jpg

可以看到,在 Netty Server 端,buffer 只存在 LocalBufferPool 中,subpartition 自己并没有缓存 buffer 或者独享一部分 buffer,而在接收端,channel 有自己独享的一部分 buffer(Exclusive Buffers),也有一部分共享的 buffer(Floating Buffers),所以,Network Buffer 的使用同时存在于发送端和接收端。

由此可见,TaskManager 内需要的 buffers 数量等于这个 TaskManager 内的所有 Task 中的发送端和接收端使用到的 network buffer 总和。明确了 Network Buffer 使用的位置,我们可以结合一些参数计算出作业实际所需的 NetworkBuffer 数量。

5 Flink 内存调优

了解了 Flink JobManager Memory 和 TaskManager Memory的内存模型和数据结构之后,应该针对不同的部署情况,配置不同的内存,下面我们针对不同的部署方式介绍内存如何调优。

5.1 为 Standalone 配置内存

建议为 Standalone 配置 Flink 总内存,设置 JobManager 和 TaskManager 的 flink.size 大小,声明为 Flink 本身提供了多少内存。配置方式如下:

2021-10-12_190701.jpg

5.2 为 Containers(容器) 配置内存

建议为容器化部署(Kubernetes或Yarn)配置总进程内存,设置 process.size 大小,它声明了总共应该分配多少内存给 Flink JVM 进程,并对应于请求容器的大小。配置方式如下:
2021-10-12_190730.jpg

注意:如果你配置了总 Flink 内存, Flink 会隐式添加 JVM 内存组件来推导总进程内存,并请求一个具有该推导大小的内存的容器。

警告:如果 Flink 或用户代码分配超出容器大小的非托管堆外(本机)内存,作业可能会失败,因为部署环境可能会杀死有问题的容器。

5.3  为 state backends(状态后端)配置内存

为 state backends(状态后端)配置内存时,这仅与 TaskManager 相关。

在部署 Flink 流应用程序时,所使用的状态后端类型将决定集群的最佳内存配置。

5.3.1 HashMap 状态后端

运行无状态作业或使用 HashMapStateBackend 时,将托管内存设置为零。这将确保为 JVM 上的用户代码分配最大数量的堆内存。配置如下:
2021-10-12_190804.jpg

5.3.2 RocksDB 状态后端

该 EmbeddedRocksDBStateBackend 使用本机内存。默认情况下,RocksDB 设置为将本机内存分配限制为托管内存的大小。因此,为你的状态保留足够的托管内存非常重要。如果禁用默认的 RocksDB 内存控制,RocksDB 分配的内存超过请求的容器大小(总进程内存)的限制,则可以在容器化部署中终止 TaskManager 。

5.4  为 batch Job(批处理作业)配置内存

为 batch Job(批处理作业)配置内存时,这仅与 TaskManager 相关。

Flink 的批处理操作符利用托管内存来更高效地运行。这样做时,可以直接对原始数据执行某些操作,而无需反序列化为 Java 对象。这意味着托管内存配置对应用程序的性能有实际影响。Flink 将尝试分配和使用 为批处理作业配置的尽可能多的托管内存,但不会超出其限制。这可以防止 OutOfMemoryError's,因为 Flink 准确地知道它必须利用多少内存。如果托管内存不足,Flink 会优雅地溢出到磁盘。

6 故障排除

6.1 非法配置异常

如果您看到从 TaskExecutorProcessUtils 或 JobManagerProcessUtils抛出的 IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于 1 的分数等)或配置冲突。请重新配置内存参数。

6.2 Java 堆空间异常

如果报 OutOfMemoryError: Java heap space 异常,通常表示 JVM Heap 太小。您可以尝试通过增加总内存来增加 JVM 堆大小。您也可以直接为 TaskManager 增加任务堆内存或为 JobManager 增加 JVM 堆内存。

  •         还可以为 TaskManagers 增加框架堆内存,但只有在确定 Flink 框架本身需要更多内存时才应该更改此选项。


6.3 直接缓冲存储器异常

如果报 OutOfMemoryError: Direct buffer memory 异常,通常表示 JVM直接内存限制太小或存在直接内存泄漏。检查用户代码或其他外部依赖项是否使用了 JVM 直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。可以参考如何为 TaskManagers、 JobManagers 和 Flink 设置的JVM 参数配置堆外内存。

6.4 元空间异常

如果报 OutOfMemoryError: Metaspace 异常,通常表示 JVM 元空间限制配置得太小。您可以尝试加大 JVM 元空间 TaskManagers 或JobManagers 选项。

6.5 网络缓冲区数量不足

如果报 IOException: Insufficient number of network buffers 异常,这仅与 TaskManager 相关。通常表示配置的网络内存大小不够大。您可以尝试通过调整以下选项来增加网络内存:

2021-10-12_190857.jpg

6.6 超出容器内存异常

这个对应 5.2 节为容器配置内存。如果 Flink 容器尝试分配超出其请求大小(Yarn 或 Kubernetes)的内存,这通常表明 Flink 没有预留足够的本机内存。当容器被部署环境杀死时,可以通过使用外部监控系统或从错误消息中观察到这一点。

如果在 JobManager 进程中遇到这个问题,还可以通过设置排除可能的 JVM Direct Memory 泄漏的选项来开启 JVM Direct Memory 的限制 使用以下命令进行配置

2021-10-12_190924.jpg

如果使用 RocksDBStateBackend,并且内存控制被禁用:可以尝试增加 TaskManager 的托管内存。在保存点或完整检查点期间启用内存控制和非堆内存增加,这可能是由于glibc内存分配器而发生的。

可以尝试为 TaskManagers 添加环境变量 MALLOC_ARENA_MAX=1,或者增加 JVM 开销。

作者:3分钟秒懂大数据
来源:https://mp.weixin.qq.com/s/eO7y8nULPv22PQDfgB79qg


最新经典文章,欢迎关注公众号



已有(1)人评论

跳转到指定楼层
仙本是凡123 发表于 2021-10-13 17:10:37
3.2.1   3.2.2 那里写错了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条