本文为您介绍使用MaxCompute过程中常见的数据倾斜场景以及对应的解决方案。
在了解数据倾斜之前首先需要了解什么是MapReduce,MapReduce是一种典型的分布式计算框架,它采用分治法的思想,将一些规模较大或者难以直接求解的问题分割成较小规模或容易处理的若干子问题,对这些子问题进行求解后将结果合并成最终结果。MapReduce相较于传统并行编程框架,具有高容错性、易使用性以及较好的扩展性等优点。在MapReduce中实现并行程序无需考虑分布式集群中的编程无关问题,如数据存储、节点间的信息交流和传输机制等,大大简化了其用户的分布式编程方式。
数据倾斜多发生在Reducer端,Mapper按Input files切分,一般相对均匀,数据倾斜指表中数据分布不均衡的情况分配给不同的Worker。数据不均匀的时候,导致有的Worker很快就计算完成了,但是有的Worker却需要运行很长时间。在实际生产中,大部分数据存在偏斜,这符合“二八”定律,例如一个论坛20%的活跃用户贡献了80%的帖子,或者一个网站80%的访问量由20%的用户提供。在数据量爆炸式增长的大数据时代,数据倾斜问题会严重影响分布式程序的执行效率。作业运行表现为作业的执行进度一直停留在99%,作业执行感觉被卡住了。
在Fuxi Jobs中对运行时间Latency按照降序排列,选择运行时间最长的Job Stage。
在Fuxi Instance of Fuxi Stage中对运行时间Latency按照降序排列,选择运行时长远大于平均时长的任务,一般选择第一个进行锁定,查看其对应的输出日志StdOut。
根据StdOut中的反馈信息,查看对应的作业执行图。
根据作业执行图中的Key信息,可以进而定位到导致数据倾斜的SQL代码片段。
使用示例如下。
根据使用经验总结,引起数据倾斜的主要原因有如下几类:
Join
GroupBy
Count(Distinct)
ROW_NUMBER(TopN)
动态分区
其中出现的频率排序为JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > 动态分区。
针对Join端产生的数据倾斜,会存在多种不同的情况,例如大表和小表Join、大表和中表Join、Join热值长尾。
大表Join小表。
数据倾斜示例。
如下示例中t1是一张大表,t2、t3是小表。
解决方案。
注意事项。
引用小表或子查询时,需要引用别名。
MapJoin支持小表为子查询。
MapJoin中多个小表用半角逗号(,)分隔,例如/*+ mapjoin(a,b,c)*/。
MapJoin在Map阶段会将指定表的数据全部加载在内存中,因此指定的表仅能为小表,且表被加载到内存后占用的总内存不得超过512 MB。由于MaxCompute是压缩存储,因此小表在被加载到内存后,数据大小会急剧膨胀。此处的512 MB是指加载到内存后的空间大小。可以通过如下参数设置加大内存,最大为8192 MB。
MapJoin中Join操作的限制。
left outer join的左表必须是大表。
right outer join的右表必须是大表。
不支持full outer join。
inner join的左表或右表均可以是大表。
MapJoin最多支持指定128张小表,否则报语法错误。
大表Join中表。
数据倾斜示例。
如下示例中t0为大表,t1为中表。
解决方案。
Join热值长尾。
数据倾斜示例
在下面这个表中,eleme_uid中存在很多热点数据,容易发生数据倾斜。
解决方案。
可以通过如下四种方法来解决。
序号
方案
说明
方案一
手动切分热值
将热点值分析出来后,从主表中过滤出热点值记录,先进行MapJoin,再将剩余非热点值记录进行MergeJoin,最后合并两部分的Join结果。
方案二
设置SkewJoin参数
方案三
SkewJoin Hint
使用Hint提示:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。SkewJoin Hint的方式相当于多了一次找倾斜Key的操作,会让Query运行时间加长;如果用户已经知道倾斜Key了,就可以通过设置SkewJoin参数的方式,能节省一些时间。
方案四
倍数表取模相等Join
利用倍数表。
手动切分热值。
将热点值分析出来后,从主表中过滤出热点值记录,先进行MapJoin,再将剩余非热点值记录进行MergeJoin,最后合并两部分的Join结果。具体可以参考如下代码示例:
设置SkewJoin参数。
使用示例如下:
SkewJoin Hint。
在SELECT语句中使用如下Hint提示:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/才会执行MapJoin,其中table_name为倾斜表名,column_name为倾斜列名,value为倾斜Key值。使用示例如下。
SkewJoin Hint方法直接指定值的处理效率比手动切分热值方法和设置SkewJoin参数方法(不指定值)高。
SkewJoin Hint支持的Join类型:
Inner Join可以对Join两侧表中的任意一侧进行Hint。
Left Join、Semi Join和Anti Join只可以Hint左侧表。
Right Join只可以Hint右侧表。
Full Join不支持Skew Join Hint。
建议只对一定会出现数据倾斜的Join添加Hint,因为Hint会运行一个Aggregate,存在一定代价。
被Hint的Join的Left Side Join Key的类型需要与Right Side Join Key的类型一致,否则SkewJoin Hint不生效。例如上例中的a.c0与b.c0的类型需要一致,a.c1与b.c1的类型需要一致。您可以通过在子查询中将Join Key进行Cast从而保持一致。示例如下:
SkewJoin Hint只支持对Join其中一侧进行Hint。
被Hint的Join一定要有left key = right key,不支持笛卡尔积Join。
MapJoin Hint的Join不能再添加SkewJoin Hint。
倍数表取模相等Join。
该方案和前三个方案的逻辑不同,不是分而治之的思路,而是利用一个倍数表,其值只有一列:int列,比如可以是从1到N(具体可根据倾斜程度确定),利用这个倍数表可以将用户行为表放大N倍,然后Join时使用用户ID和number两个关联键。这样原先只按照用户ID分发导致的数据倾斜就会由于加入了number关联条件而减少为原先的1/N。但是这样做也会导致数据膨胀N倍。
基于上面数据膨胀的情况,我们还可以将膨胀只局限作用于两表中的热点值记录,其他非热点值记录不变。先找到热点值记录,然后分别处理流量表和用户行为表,新增加一个eleme_uid_join列,如果用户ID是热点值,concat一个随机分配正整数(0到预定义的倍数之间,比如0~1000),如果不是则保持原用户ID不变。在两表Join时使用eleme_uid_join列。这样既起到了放大热点值倍数减小倾斜程度的作用,又减少了对非热点值无效的膨胀。不过可想而知的是这样的逻辑会将原先的业务逻辑SQL改得面目全非,因此不建议使用。
一个带GroupBy的伪代码示例如下。
当发生数据倾斜时,可以通过如下三种方案解决:
序号
方案
说明
方案一
设置Group By防倾斜的参数
方案二
添加随机数
把引起长尾的Key进行拆分。
方案三
创建滚存表
降本提效。
方案一:设置Group By防倾斜的参数。
方案二:添加随机数。
相对于方案一,此解决方案对SQL进行改写,添加随机数,把引起长尾的Key进行拆分是解决Group By长尾的一种比较好的方法。
对于SQL:Select Key,Count(*) As Cnt From TableName Group By Key;不考虑Combiner,M节点会Shuffle到R节点上,然后R节点再做Count操作,对应的执行计划是M->R。
假定已经找到了引起长尾的key,对长尾的Key再做一次工作再分配,就变成:
改完之后的执行计划变成了M->R->R,虽然执行步骤变长了,但是长尾的Key经过了2个步骤的处理,整体的时间消耗可能反而有所减少。资源消耗与耗时效果方面跟方案一基本持平,但实际场景中引发长尾的Key不止一个,再考虑寻找长尾Key和SQL改写的投入成本,方案一会更低一些。
创建滚存表。
核心降本提效,我们的核心诉求是取过去一年的商户数据,对于线上任务而言,每次都要读取T-1至T-365的所有分区其实是对资源的很大浪费,创建滚存表可以减少分区的读取但是又不影响过去一年的取数,示例如下。
首次初始化365天的商户营业数据(Group By汇总),标记数据更新日期,记为表a;后续线上任务切换为T-2日表a关联table_xxx_di表再Group By,这样每天读取的数据从365减少到了2个,主键shopid的重复性极大降低,对资源的消耗也会减少。
假如一个表数据分布如下。
ds(分区)
cnt(记录数)
20220416
73025514
20220415
2292806
20220417
2319160
使用下面的语句就容易发生数据倾斜:
解决方案如下:
序号
方案
说明
方案一
参数设置调优
方案二
通用两阶段聚合
在partition字段值拼接随机数。
方案三
类似两阶段聚合
先通过GroupBy两分组字段(ds+shop_id)再使用count(distinct)。
方案一:参数设置调优。
设置如下参数。
方案二:通用两阶段聚合。
若shop_id字段数据不均匀,则无法通过方案一优化,较通用的方式是在分区(partition)字段值中拼接随机数。
方案三:类似两阶段聚合。
如果GroupBy与Distinct的字段数据都均匀,则可以采用如下方式优化,先GroupBy两分组字段(ds和shop_id)再使用count(distinct)命令。
Top10的示例如下。
当发生数据倾斜时,可以通过以下几种方式解决:
序号
方案
说明
方案一
SQL写法的两阶段聚合。
增加随机列或拼接随机数,将其作为分区(Partition)中一参数。
方案二
UDAF写法的两阶段聚合。
最小堆的队列优先的通过UDAF的方式进行调优。
方案一:SQL写法的两阶段聚合。
为使Map阶段中Partition各分组数据尽可能均匀,增加随机列,将其作为Partition中一参数。
方案二:UDAF写法的两阶段聚合。
SQL方式会有较多代码,且可能不利于维护,此处将利用最小堆的队列优先的通过UDAF的方式进行调优,即在iterate阶段仅取TopN,merge阶段则均仅对N个元素融合,过程如下。
iterate:将前K个元素进行push,K之后的元素通过不断与最小顶比较交换堆中元素。
merge:将两堆merge后,原地返回前K个元素。
terminate:数组形式返回堆。
SQL中将数组拆为各行。
很多场景会建立动态分区的表,也容易发生数据倾斜。当发生数据倾斜时,可以通过下面的解决方案来解决。
序号
方案
说明
方案一
参数配置优化
通过参数配置进行优化。
方案二
裁剪优化
通过查找到存在记录数较多的分区裁剪后单独插入的方式解决。
方案一:参数配置优化。
动态分区可以把符合不同条件的数据放到不同的分区,避免需要通过多次Insert OverWrite写入到表中,特别是分区数比较多时,能够很好的简化代码,但是动态分区也有可能会带来小文件过多的困扰。
数据倾斜示例。
以如下最简SQL为例。
假设其有K个Map Instance,N个目标分区。
最极端的情况下,可能产生K*N个小文件,而过多的小文件会对文件系统造成巨大的管理压力,因此MaxCompute对动态分区的处理是引入额外的一级Reduce Task, 把相同的目标分区交由同一个(或少量几个) Reduce Instance来写入, 避免小文件过多,并且这个Reduce肯定是最后一个Reduce Task操作。在MaxCompute中默认开启此功能,也就是如下参数设置为True。
默认开启该功能,解决了小文件过多的问题,不会因为单个Instance产生文件数过多而导致任务出错,但也引入了新的问题:数据倾斜,并且额外引入一级Reduce操作也耗费计算资源,因此如何保持这两者的平衡,需要认真权衡。
解决方案。
对于上面一段代码如果使用默认参数,整个任务的运行时长约为1小时30分钟,其中最后一个Reduce的运行时长约为1小时20分钟,占到总运行时长的90%左右。由于引入额外的一个Reduce以后,使得每个Reduce Instance的数据分布特别不均匀,导致了数据长尾。
使用了动态分区
动态分区个数<=50
并且表根据最后一个Fuxi Instance的执行时长来判断该节点是否需要设置该参数的迫切程度,通过diag_level字段来标识别,规则如下:
Last_Fuxi_Inst_Time大于30分钟:Diag_Level=4('严重')。
Last_Fuxi_Inst_Time在20到30分钟之间:Diag_Level=3 ('高')。
Last_Fuxi_Inst_Time在10到20分钟之间:Diag_Level=2 ('中')。
Last_Fuxi_Inst_Time小于10分钟:Diag_Level=1('低')。
方案二:裁剪优化。
根据动态分区插数据时Map阶段就存在的数据倾斜问题,可通过查找到存在记录数较多的分区裁剪后单独插入的方式解决。基于案例实际情况可修改Map阶段的参数配置,如下所示:
由结果知,全过程进行了全表扫描,进一步优化,可通过关闭系统引入的Reduce Job优化,过程如下:
根据动态分区插数据时Map阶段就存在的数据倾斜问题,可通过查找到存在记录数较多的分区裁剪后单独插入的方式解决,具体步骤如下。
使用如下命令示例查询记录数较多的特定分区。
部分分区如下:
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
19
20191003
20
18
20200522
18
20220504
18
使用如下命令示例过滤上述分区插入后,再单独插入大记录数分区数据。
关注阿里云公众号或下载阿里云APP,关注云资讯,随时随地运维管控云服务