Flink SQL作业调优:手动配置调优
问题导读:1.手动配置调优的内容主要有哪3种类型?2.作业参数调优的优缺点?3.资源调优的优缺点?上一篇Flink SQL开发指南:AutoConf自动配置调优https://www.aboutyun.com/forum.php?mod=viewthread&tid=31101
本文为您介绍实时计算作业手动配置调优。
手动配置调优 {#section_q1g_lhm_bgb .section}手动配置调优的内容主要有3种类型:
[*]作业参数调优:主要对作业中的miniBatch等参数进行调优;
[*]资源调优:主要对作业中的Operator的并发数(arallelism)、CPU(Core)、堆内存(heap_memory)等参数进行调优;
[*]上下游参数调优:主要对作业中的上下游存储参数进行调优。
下面通过3个章节对以上3类调优进行介绍,参数调优后将生成新的配置,JPb需重新上线启动/恢复才能使用新的配置,本文最后章节将讲述如何重新启用新的配置。
作业参数调优 {#section_sxj_yjm_bgb .section}miniBatch设置:该设置只能优化group by。Flink SQL纯流模式下,每来一条数据都会去操作state,io消耗较大,设置miniBatch后,同一个key的一批数据只访问一次state,且只输出最新的一条数据,即减少了state访问也减少了向下游的数据更新。miniBatch设置如下:如果是新增加的作业参数建议用户停止重启,如果是改变作业参数大小暂停恢复即可。# excatly-once语义
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint间隔时间,单位毫秒
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 2.x使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 2.x开启5秒的microbatch(窗口函数不要设置这个参数。)
blink.microBatch.allowLatencyMs=5000
# 表示整个job允许的延迟
blink.miniBatch.allowLatencyMs=5000
# 单个batch的size
blink.miniBatch.size=20000
# local 优化,2.x默认已经开启,1.6.4需手动开启
blink.localAgg.enabled=true
# 2.x开启partial优化,解决count distinct热点
blink.partialAgg.enabled=true
# union all 优化
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# GC 优化(SLS做源表不能设置。)
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# 时区设置
blink.job.timeZone=Asia/Shanghai
资源调优 {#section_dj3_clm_bgb .section}
[*]分析问题
[*]通过job的拓扑图查看到2号的TASK节点的输入队列已达到100%造成数据堆积反压到它的上游1号的TASK节点,输出队列造成数据堆积。
2.单击2号的TASK节点,找到队列已达到100%的TaskExecutor。
3.查看TaskExecutor的CPU和内存的使用量,根据使用量来调大相应的CPU和MEM。
[*]性能调优
[*]进入调优窗口
[*]
[*]打开可视化编辑
[*]
[*]找到2号task对应的Group(若有)或Operator,可以按Group批量修改或对单个Operator进行参数修改。
[*]按Group批量修改:
[*]
[*]
[*]单个Operator修改:
[*]
[*]
[*]找到对应的Operator进行修改:
[*]
[*]
[*]配置参数后点击右上角的应用当前配置并关闭窗口,即可保存当前配置。 说明:
[*]如果在调优的过程中发现虽然调大了某个Group的资源配置但是并没有太大的效果,首先需要确认该节点是否有数据倾斜现象,如果有数据倾斜,首要解决数据倾斜;其次如果没有数据倾斜需要把其中复杂运算的Operator节点(如:group by、window、join等)拆开,来判断到底是哪个子节点有异常。如果找到了该子节点只需调优该节点就好。将Operator子节点拆开的方法:点击需要修改的Operator,将其参数chainingStrategy修改为HEAD,若其已为HEAD,需要将其后面的第一个Operator修改为HEAD。 chainingStrategy三个参数的解释如下:
[*]ALWAYS:代表把单个的节点合并成一个大的GROUP里面。
[*]NEVER:保持不变
[*]HEAD :表示把在合并成一个GROUP里面单个节点查分出来。
[*]资源参数的配置原则和建议
[*]可调参数
[*]parallelism
[*]source
说明: source的并发不能大于source的shard数。
[*]资源根据上游Partition数来。
[*]例如SOURCE的个数是16,那么source的并发可以配置为16、8或4等,不要超过16。
[*]中间的处理节点
[*]根据预估的QPS计算。
[*]对于小任务来说,和source一样的并发度就够了。
[*]QPS高的任务,可以配大点,例如 64,128 或者 256。
[*]sink节点
[*]并发度和下游存储的Partition数相关,一般是下游Partition个数的2~3倍。
[*]如果配置太大会导致写超时或失败。例如下游SINK的个数是16,那么建议sink的并发最大可以配置48。
[*]CORE
CPU,默认 0.1,根据实际CPU使用配置(但最好能被1整除),一般建议0.25
[*]heap_memory
堆内存,默认 256MB,根据实际内存使用配置 点击GROUP就可以编辑以上参数。
[*]存在GROUP BY的TASK节点可配置参数
state_size:state大小,默认0。如果operator有用state,需要把state_size配成1,表示该operator会用state,job在申请资源的时候会额外为该operator申请内存,供state访问使用;如果不配成1,job可能被yarn kill。(state_size需要配成1的operator有:group by、 join、over和window)虽然有这么多配置项,对普通用户来说,只需要关心:core,parallelism和heap_memory。整个job 建议core:mem=1:4,即一个核对应4G内存。
说明:
调parallelism和memory,有什么规则吗?
一个operator的总CU=并发*core 一个operator的总mem=并发*heap_mem 一个group中的core取最大值,mem取各个operator的sum,CPU和MEM的关系是1:4的关系;比如您的core给的是1CU,mem给的是3G,那么最终分配的是1CU+4G。您的core给的是1CU,mem给的是5G,那么最终分配的是1.25CU+5G。
上下游参数调优 {#section_wml_snm_bgb .section}由于实时计算的特性,每条数据均会触发上下游存储的读写,会对上下游存储形成性能压力,可以通过设置batchsize,批量的读写上下游存储数据来降低对上下游存储的压力,支持batchsize参数的上下游存储如下:|名称|参数|详情|设置参数值| |DATAHUB源表|batchReadSize|单次读取条数|可选,默认为10| |DATAHUB结果表|batchSize|单次写入条数|可选,默认为300。| |日志服务(Log Service)源表|batchGetSize|单次读取logGroup条数|可选,默认为10。| |分析型数据库(AnalyticDB)结果表|batchSize|每次写的批次大小|可选,默认为1000。| |云数据库(RDS)结果表|batchSize|每次写的批次大小|可选,默认为50。| |云数据库HybridDB for MySQL(petaData)结果表|batchSize|每次写的批次大小|可选,默认值1000 ,表示每次写多少条,经验建议最大设置4096。| |bufferSize|去重的buffer大小,需要指定主键才生效。|可选。设置batchSize必须设置bufferSize,经验建议最大设置4096。|示例:
重新启用新的配置 {#section_isd_5nm_bgb .section}通过1-3章节完成配置后,需要重新启动/恢复作业才能使新配置生效。1.上线作业,配置方式必须选择使用上次资源配置 。
2.暂停原作业。
3.恢复原作业。
4.选择按最新配置恢复, 否则新配置无法生效。
5.重新恢复后,可通过运维 > 运行信息 > Vertex拓扑查看新的配置是否生效。
说明:一般情况下我们不建议采用先停止job再启动的方式使新配置生效,因为job停止后status状态会消除,可能会导致计算结果不一致。
本文档涉及到的相关名词解释 {#section_s12_c4m_bgb .section}
[*]global
[*]isChainingEnabled :表示是否启用chain策略,默认为 true,不需要修改。
[*]nodes
[*]id:节点id号,自动生成,唯一,不需要修改。
[*]uid: 节点uid号,用于计算operator id,如果不设置,会使用id。
[*]pact:节点类型,例如Data Source,Operator,Data Sink等等,不需要修改。
[*]name:节点名字,用户可以自定义。
[*]slotSharingGroup:default,不需要修改。
[*]chainingStrategy:chain的策略,有 HEAD、ALWAYS和NEVER,根据需要修改。
[*]parallelism:并发度,默认为1,可以根据实际数据量改大点。
[*]core:CPU,默认0.1,根据实际CPU使用配置(但最好能被1整除),一般建议0.25。
[*]heap_memory:堆内存,默认256MB,根据实际内存使用配置。
[*]direct_memory:jvm堆外内存,默认0,建议不要修改。
[*]native_memory:jvm堆外内存,jni使用,默认0,建议用10MB。
[*]chain
[*]Flink SQL任务是一个DAG图,会有很多个节点(Operator),有些上下游的节点在运行时是可以合成一个点的,这称之为chain。对于chain之后的点,CPU取最大的最大值,内存取总和。例如Node1如果operator有用state,Node2{128MB,0.5core},Node3{128MB,0.25core},那么这三个点chain后的CPU是 0.5core,内存是 512MB。chain的规则简单来说就是:并发度需要一样。但是,有些节点之间是不能合在一起的,比如groupBy。一般来说,尽可能的让节点都chain在一起,减少网络传输。
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
页:
[1]