levycui 发表于 2020-7-22 11:10:19

Flink1.11内存机制之JobManager内存及调优指南

本帖最后由 levycui 于 2020-7-22 11:18 编辑

问题导读:
1、如何配置 JobManager 内存?
2、如何配置 JVM 堆内存?
3、Flink JVM 进程内存限制如何设置?
4、如何解决容器(Container)内存超用问题?
static/image/hrline/line7.png
上一篇:Flink1.11内存机制之进程内存及TaskManager内存

配置 JobManager 内存

JobManager 是 Flink 集群的控制单元。它由三种不同的组件组成:ResourceManager、Dispatcher 和每个正在运行作业的 JobMaster。本篇文档将介绍 JobManager 内存在整体上以及细粒度上的配置方法。

[*]配置总内存
[*]详细配置

[*]配置 JVM 堆内存
[*]配置堆外内存
[*]本地执行
本文接下来介绍的内存配置方法适用于 1.11 及以上版本。Flink 在 1.11 版本中对内存配置部分进行了较大幅度的改动,从早期版本升级的用户请参考升级指南。
提示本篇内存配置文档仅针对 JobManager!与 TaskManager 相比,JobManager 具有相似但更加简单的内存模型。

配置总内存

配置 JobManager 内存最简单的方法就是进程的配置总内存。本地执行模式下不需要为 JobManager 进行内存配置,配置参数将不会生效。

详细配置


如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
JVM 堆内存jobmanager.memory.heap.sizeJobManager 的 JVM 堆内存。
堆外内存jobmanager.memory.off-heap.sizeJobManager 的堆外内存(直接内存或本地内存)。
JVM Metaspacejobmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销jobmanager.memory.jvm-overhead.min
jobmanager.memory.jvm-overhead.max
jobmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。


配置 JVM 堆内存
如配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。

[*]Flink 框架
[*]在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码
Flink 需要多少 JVM 堆内存,很大程度上取决于运行的作业数量、作业的结构及上述用户代码的需求。
提示如果已经明确设置了 JVM 堆内存,建议不要再设置进程总内存或 Flink 总内存,否则可能会造成内存配置冲突。
在启动 JobManager 进程时,Flink 启动脚本及客户端通过设置 JVM 参数 -Xms 和 -Xmx 来管理 JVM 堆空间的大小。请参考 JVM 参数。

配置堆外内存
堆外内存包括 JVM 直接内存 和 本地内存。可以通过配置参数 jobmanager.memory.enable-jvm-direct-memory-limit 设置是否启用 JVM 直接内存限制。如果该配置项设置为 true,Flink 会根据配置的堆外内存大小设置 JVM 参数 -XX:MaxDirectMemorySize。请参考 JVM 参数。
可以通过配置参数 jobmanager.memory.off-heap.size 设置堆外内存的大小。如果遇到 JobManager 进程抛出 “OutOfMemoryError: Direct buffer memory” 的异常,可以尝试调大这项配置。请参考常见问题。
一下情况可能用到堆外内存:

[*]Flink 框架依赖(例如 Akka 的网络通信)
[*]在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码
提示如果同时配置了 Flink 总内存和 JVM 堆内存,且没有配置堆外内存,那么堆外内存的大小将会是 Flink 总内存减去JVM 堆内存。这种情况下,对外内存的默认大小将不会生效。

本地执行

如果你是在本地运行 Flink(例如在 IDE 中)而非创建一个集群,那么 JobManager 的内存配置将不会生效。

调优指南
本文在的基本的配置指南的基础上,介绍如何根据具体的使用场景调整内存配置,以及在不同使用场景下分别需要重点关注哪些配置参数。

[*]独立部署模式(Standalone Deployment)下的内存配置
[*]容器(Container)的内存配置
[*]State Backend 的内存配置

[*]Heap State Backend
[*]RocksDB State Backend
[*]批处理作业的内存配置

独立部署模式(Standalone Deployment)下的内存配置
独立部署模式下,我们通常更关注 Flink 应用本身使用的内存大小。建议配置 Flink 总内存(taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其组成部分。此外,如果出现 Metaspace 不足的问题,可以调整 JVM Metaspace 的大小。
这种情况下通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销 进行限制,它只与机器的物理资源相关。

容器(Container)的内存配置

在容器化部署模式(Containerized Deployment)下(Kubernetes、Yarn 或 Mesos),建议配置进程总内存(taskmanager.memory.process.size 或者 jobmanager.memory.process.size)。该配置参数用于指定分配给 Flink JVM 进程的总内存,也就是需要申请的容器大小。
提示如果配置了 Flink 总内存,Flink 会自动加上 JVM 相关的内存部分,根据推算出的进程总内存大小申请容器。
注意: 如果 Flink 或者用户代码分配超过容器大小的非托管的堆外(本地)内存,部署环境可能会杀掉超用内存的容器,造成作业执行失败。
请参考容器内存超用中的相关描述。

State Backend 的内存配置

本章节内容仅与 TaskManager 相关。
在部署 Flink 流处理应用时,可以根据 State Backend 的类型对集群的配置进行优化。
Heap State Backend

执行无状态作业或者使用 Heap State Backend(MemoryStateBackend或 FsStateBackend)时,建议将托管内存设置为 0。这样能够最大化分配给 JVM 上用户代码的内存。
RocksDB State Backend
RocksDBStateBackend 使用本地内存。默认情况下,RocksDB 会限制其内存用量不超过用户配置的托管内存。因此,使用这种方式存储状态时,配置足够多的托管内存是十分重要的。如果你关闭了 RocksDB 的内存控制,那么在容器化部署模式下如果 RocksDB 分配的内存超出了申请容器的大小(进程总内存),可能会造成 TaskExecutor 被部署环境杀掉。请同时参考如何调整 RocksDB 内存以及 state.backend.rocksdb.memory.managed。

批处理作业的内存配置Flink 批处理算子使用托管内存来提高处理效率。算子运行时,部分操作可以直接在原始数据上进行,而无需将数据反序列化成 Java 对象。这意味着托管内存对应用的性能具有实质上的影响。因此 Flink 会在不超过其配置限额的前提下,尽可能分配更多的托管内存。Flink 明确知道可以使用的内存大小,因此可以有效避免 OutOfMemoryError 的发生。当托管内存不足时,Flink 会优雅地将数据落盘。


常见问题
[*]IllegalConfigurationException
[*]OutOfMemoryError: Java heap space
[*]OutOfMemoryError: Direct buffer memory
[*]OutOfMemoryError: Metaspace
[*]IOException: Insufficient number of network buffers
[*]容器(Container)内存超用
IllegalConfigurationException

如果遇到从 TaskExecutorProcessUtils 或 JobManagerProcessUtils 抛出的 IllegalConfigurationException 异常,这通常说明您的配置参数中存在无效值(例如内存大小为负数、占比大于 1 等)或者配置冲突。请根据异常信息,确认出错的内存部分的相关文档及配置信息。
OutOfMemoryError: Java heap space

该异常说明 JVM 的堆空间过小。可以通过增大总内存、TaskManager 的任务堆内存、JobManager 的 JVM 堆内存等方法来增大 JVM 堆空间。
提示也可以增大 TaskManager 的框架堆内存。这是一个进阶配置,只有在确认是 Flink 框架自身需要更多内存时才应该去调整。
OutOfMemoryError: Direct buffer memory
该异常通常说明 JVM 的直接内存限制过小,或者存在直接内存泄漏(Direct Memory Leak)。请确认用户代码及外部依赖中是否使用了 JVM 直接内存,以及如果使用了直接内存,是否配置了足够的内存空间。可以通过调整堆外内存来增大直接内存限制。有关堆外内存的配置方法,请参考 TaskManager、JobManager 以及 JVM 参数的相关文档。
OutOfMemoryError: Metaspace

该异常说明 JVM Metaspace 限制过小。可以尝试调整 TaskManager、JobManager 的 JVM Metaspace。
IOException: Insufficient number of network buffers

该异常仅与 TaskManager 相关。
该异常通常说明网络内存过小。可以通过调整以下配置参数增大网络内存:

[*]taskmanager.memory.network.min
[*]taskmanager.memory.network.max
[*]taskmanager.memory.network.fraction

容器(Container)内存超用

如果 Flink 容器尝试分配超过其申请大小的内存(Yarn、Mesos 或 Kubernetes),这通常说明 Flink 没有预留出足够的本地内存。可以通过外部监控系统或者容器被部署环境杀掉时的错误信息判断是否存在容器内存超用。
对于 JobManager 进程,你还可以尝试启用 JVM 直接内存限制(jobmanager.memory.enable-jvm-direct-memory-limit),以排除 JVM 直接内存泄漏的可能性。
如果使用了 RocksDBStateBackend 且没有开启内存控制,也可以尝试增大 TaskManager 的托管内存。
此外,还可以尝试增大 JVM 开销。
请参考如何配置容器内存。


升级指南在 1.10 和 1.11 版本中,Flink 分别对 TaskManager 和 JobManager 的内存配置方法做出了较大的改变。部分配置参数被移除了,或是语义上发生了变化。本篇升级指南将介绍如何将 Flink 1.9 及以前版本的 TaskManager 内存配置升级到 Flink 1.10 及以后版本,以及如何将 Flink 1.10 及以前版本的 JobManager 内存配置升级到 Flink 1.11 及以后版本。

[*]升级 TaskManager 内存配置

[*]配置参数变化
[*]总内存(原堆内存)
[*]JVM 堆内存
[*]托管内存
[*]升级 JobManager 内存配置
[*]Flink JVM 进程内存限制
[*]容器切除(Cut-Off)内存

[*]TaskManager
[*]JobManager
[*]flink-conf.yaml 中的默认配置
注意: 请仔细阅读本篇升级指南。使用原本的和新的内存配制方法可能会使内存组成部分具有截然不同的大小。未经调整直接沿用 Flink 1.10 以前版本的 TaskManager 配置文件或 Flink 1.11 以前版本的 JobManager 配置文件,可能导致应用的行为、性能发生变化,甚至造成应用执行失败。
提示在 1.10/1.11 版本之前,Flink 不要求用户一定要配置 TaskManager/JobManager 内存相关的参数,因为这些参数都具有默认值。新的内存配置要求用户至少指定下列配置参数(或参数组合)的其中之一,否则 Flink 将无法启动。
TaskManager:JobManager:
taskmanager.memory.flink.sizejobmanager.memory.flink.size
taskmanager.memory.process.sizejobmanager.memory.process.size
taskmanager.memory.task.heap.size 和
taskmanager.memory.managed.sizejobmanager.memory.heap.size

Flink 自带的默认 flink-conf.yaml 文件指定了 taskmanager.memory.process.size(>= 1.10)和 jobmanager.memory.process.size (>= 1.11),以便与此前的行为保持一致。
可以使用这张电子表格来估算和比较原本的和新的内存配置下的计算结果。

升级 TaskManager 内存配置
配置参数变化

本节简要列出了 Flink 1.10 引入的配置参数变化,并援引其他章节中关于如何升级到新配置参数的相关描述。
下列配置参数已被彻底移除,配置它们将不会产生任何效果。      

移除的配置参数备注
taskmanager.memory.fraction                请参考新配置参数 taskmanager.memory.managed.fraction 的相关描述。                新的配置参数与被移除的配置参数在语义上有所差别,因此其配置值通常也需要做出适当调整。                请参考如何升级托管内存。            
taskmanager.memory.off-heapFlink 不再支持堆上的(On-Heap)托管内存。请参考如何升级托管内存。
taskmanager.memory.preallocateFlink 不再支持内存预分配,今后托管内存将都是惰性分配的。请参考如何升级托管内存。

下列配置参数将被弃用,出于向后兼容性考虑,配置它们将被解读成对应的新配置参数。      

弃用的配置参数对应的新配置参数
taskmanager.heap.size               
[*]独立部署模式(Standalone Deployment)下:taskmanager.memory.flink.size
[*]容器化部署模式(Containerized Deployement)下:taskmanager.memory.process.size
                请参考如何升级总内存。            
taskmanager.memory.sizetaskmanager.memory.managed.size。请参考如何升级托管内存。
taskmanager.network.memory.mintaskmanager.memory.network.min
taskmanager.network.memory.maxtaskmanager.memory.network.max
taskmanager.network.memory.fractiontaskmanager.memory.network.fraction

尽管网络内存的配置参数没有发生太多变化,我们仍建议您检查其配置结果。网络内存的大小可能会受到其他内存部分大小变化的影响,例如总内存变化时,根据占比计算出的网络内存也可能发生变化。请参考内存模型详解。
容器切除(Cut-Off)内存相关的配置参数(containerized.heap-cutoff-ratio 和 containerized.heap-cutoff-min)将不再对 TaskManager 进程生效。请参考如何升级容器切除内存。

总内存(原堆内存)

在原本的内存配置方法中,用于指定用于 Flink 的总内存的配置参数是 taskmanager.heap.size 或 taskmanager.heap.mb。尽管这两个参数以“堆(Heap)”命名,实际上它们指定的内存既包含了 JVM 堆内存,也包含了其他堆外内存部分。这两个配置参数目前已被弃用。
Flink 在 Mesos 上还有另一个具有同样语义的配置参数 mesos.resourcemanager.tasks.mem,目前也已经被弃用。
如果配置了上述弃用的参数,同时又没有配置与之对应的新配置参数,那它们将按如下规则对应到新的配置参数。

[*]独立部署模式(Standalone Deployment)下:Flink 总内存(taskmanager.memory.flink.size)
[*]容器化部署模式(Containerized Deployement)下(Yarn、Mesos):进程总内存(taskmanager.memory.process.size)
建议您尽早使用新的配置参数取代启用的配置参数,它们在今后的版本中可能会被彻底移除。
请参考如何配置总内存.

JVM 堆内存

此前,JVM 堆空间由托管内存(仅在配置为堆上时)及 Flink 用到的所有其他堆内存组成。这里的其他堆内存是由总内存减去所有其他非堆内存得到的。请参考如何升级托管内存。
现在,如果仅配置了Flink总内存或进程总内存,JVM 的堆空间依然是根据总内存减去所有其他非堆内存得到的。请参考如何配置总内存。
此外,你现在可以更直接地控制用于任务和算子的 JVM 的堆内存(taskmanager.memory.task.heap.size),详见任务堆内存。如果流处理作业选择使用 Heap State Backend(MemoryStateBackend或 FsStateBackend),那么它同样需要使用 JVM 堆内存。
Flink 现在总是会预留一部分 JVM 堆内存供框架使用(taskmanager.memory.framework.heap.size)。请参考框架内存。

托管内存请参考如何配置托管内存。

明确的大小

原本用于指定明确的托管内存大小的配置参数(taskmanager.memory.size)已被弃用,与它具有相同语义的新配置参数为 taskmanager.memory.managed.size。建议使用新的配置参数,原本的配置参数在今后的版本中可能会被彻底移除。

占比

此前,如果不指定明确的大小,也可以将托管内存配置为占用总内存减去网络内存和容器切除内存(仅在 Yarn 和Mesos 上)之后剩余部分的固定比例(taskmanager.memory.fraction)。该配置参数已经被彻底移除,配置它不会产生任何效果。请使用新的配置参数 taskmanager.memory.managed.fraction。在未通过 taskmanager.memory.managed.size 指定明确大小的情况下,新的配置参数将指定托管内存在 Flink 总内存中的所占比例。

RocksDB State Backend流处理作业如果选择使用 RocksDBStateBackend,它使用的本地内存现在也被归为托管内存。默认情况下,RocksDB 将限制其内存用量不超过托管内存大小,以避免在 Yarn 或 Mesos 上容器被杀。你也可以通过设置 state.backend.rocksdb.memory.managed 来关闭 RocksDB 的内存控制。请参考如何升级容器切除内存。

其他变化此外,Flink 1.10 对托管内存还引入了下列变化:

[*]托管内存现在总是在堆外。配置参数 taskmanager.memory.off-heap 已被彻底移除,配置它不会产生任何效果。
[*]托管内存现在使用本地内存而非直接内存。这意味着托管内存将不在 JVM 直接内存限制的范围内。
[*]托管内存现在总是惰性分配的。配置参数 taskmanager.memory.preallocate 已被彻底移除,配置它不会产生任何效果。

升级 JobManager 内存配置在原本的内存配置方法中,用于指定 JVM 堆内存 的配置参数是:

[*]jobmanager.heap.size
[*]jobmanager.heap.mb
尽管这两个参数以“堆(Heap)”命名,在此之前它们实际上只有在独立部署模式才完全对应于 JVM 堆内存。在容器化部署模式下(Kubernetes 和 Yarn),它们指定的内存还包含了其他堆外内存部分。JVM 堆空间的实际大小,是参数指定的大小减去容器切除(Cut-Off)内存后剩余的部分。容器切除内存在 1.11 及以上版本中已被彻底移除。
上述两个参数此前对 Mesos 部署模式并不生效。Flink 在 Mesos 上启动 JobManager 进程时并未设置任何 JVM 内存参数。从 1.11 版本开始,Flink 将采用与独立部署模式相同的方式设置这些参数。

这两个配置参数目前已被弃用。如果配置了上述弃用的参数,同时又没有配置与之对应的新配置参数,那它们将按如下规则对应到新的配置参数。

[*]独立部署模式(Standalone Deployment)、Mesos 部署模式下:JVM 堆内存(jobmanager.memory.heap.size)
[*]容器化部署模式(Containerized Deployement)下(Kubernetes、Yarn):进程总内存(jobmanager.memory.process.size)
建议您尽早使用新的配置参数取代启用的配置参数,它们在今后的版本中可能会被彻底移除。
如果仅配置了 Flink 总内存或进程总内存,那么 JVM 堆内存将是总内存减去其他内存部分后剩余的部分。请参考如何配置总内存。此外,也可以通过配置 jobmanager.memory.heap.size 的方式直接指定 JVM 堆内存。

Flink JVM 进程内存限制

从 1.10 版本开始,Flink 通过设置相应的 JVM 参数,对 TaskManager 进程使用的 JVM Metaspace 和 JVM 直接内存进行限制。从 1.11 版本开始,Flink 同样对 JobManager 进程使用的 JVM Metaspace 进行限制。此外,还可以通过设置 jobmanager.memory.enable-jvm-direct-memory-limit 对 JobManager 进程的 JVM 直接内存进行限制。请参考 JVM 参数。
Flink 通过设置上述 JVM 内存限制降低内存泄漏问题的排查难度,以避免出现容器内存溢出等问题。请参考常见问题中关于 JVM Metaspace 和 JVM 直接内存 OutOfMemoryError 异常的描述。

容器切除(Cut-Off)内存

在容器化部署模式(Containerized Deployment)下,此前你可以指定切除内存。这部分内存将预留给所有未被 Flink 计算在内的内存开销。其主要来源是不受 Flink 直接管理的依赖使用的内存,例如 RocksDB、JVM 内部开销等。相应的配置参数(containerized.heap-cutoff-ratio 和 containerized.heap-cutoff-min)不再生效。新的内存配置方法引入了新的内存组成部分来具体描述这些内存用量。

TaskManager流处理作业如果使用了 RocksDBStateBackend,RocksDB 使用的本地内存现在将被归为托管内存。默认情况下,RocksDB 将限制其内存用量不超过托管内存大小。请同时参考如何升级托管内存以及如何配置托管内存。
其他堆外(直接或本地)内存开销,现在可以通过下列配置参数进行设置:

[*]任务堆外内存(taskmanager.memory.task.off-heap.size)
[*]框架堆外内存(taskmanager.memory.framework.off-heap.size)
[*]JVM Metaspace(taskmanager.memory.jvm-metaspace.size)
[*]JVM 开销

JobManager可以通过下列配置参数设置堆外(直接或本地)内存开销:

[*]堆外内存 (jobmanager.memory.off-heap.size)
[*]JVM Metaspace (jobmanager.memory.jvm-metaspace.size)
[*]JVM 开销

flink-conf.yaml 中的默认配置

本节描述 Flink 自带的默认 flink-conf.yaml 文件中的变化。
原本的 TaskManager 总内存(taskmanager.heap.size)被新的配置项 taskmanager.memory.process.size 所取代。默认值从 1024Mb 增加到了 1728Mb。
原本的 JobManager 总内存(jobmanager.heap.size)被新的配置项 jobmanager.memory.process.size 所取代。默认值从 1024Mb 增加到了 1600Mb。
请参考如何配置总内存。

注意: 使用新的默认 `flink-conf.yaml` 可能会造成各内存部分的大小发生变化,从而产生性能变化。


来源:https://ci.apache.org/projects/f ... tup_jobmanager.html
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
页: [1]
查看完整版本: Flink1.11内存机制之JobManager内存及调优指南