分析并解决由oin或roupy等操作导致的数据倾斜问题云原生大数据计算服务axompute

本文为您介绍使用MaxCompute过程中常见的数据倾斜场景以及对应的解决方案。

在了解数据倾斜之前首先需要了解什么是MapReduce,MapReduce是一种典型的分布式计算框架,它采用分治法的思想,将一些规模较大或者难以直接求解的问题分割成较小规模或容易处理的若干子问题,对这些子问题进行求解后将结果合并成最终结果。MapReduce相较于传统并行编程框架,具有高容错性、易使用性以及较好的扩展性等优点。在MapReduce中实现并行程序无需考虑分布式集群中的编程无关问题,如数据存储、节点间的信息交流和传输机制等,大大简化了其用户的分布式编程方式。

数据倾斜多发生在Reducer端,Mapper按Input files切分,一般相对均匀,数据倾斜指表中数据分布不均衡的情况分配给不同的Worker。数据不均匀的时候,导致有的Worker很快就计算完成了,但是有的Worker却需要运行很长时间。在实际生产中,大部分数据存在偏斜,这符合“二八”定律,例如一个论坛20%的活跃用户贡献了80%的帖子,或者一个网站80%的访问量由20%的用户提供。在数据量爆炸式增长的大数据时代,数据倾斜问题会严重影响分布式程序的执行效率。作业运行表现为作业的执行进度一直停留在99%,作业执行感觉被卡住了。

在Fuxi Jobs中对运行时间Latency按照降序排列,选择运行时间最长的Job Stage。

在Fuxi Instance of Fuxi Stage中对运行时间Latency按照降序排列,选择运行时长远大于平均时长的任务,一般选择第一个进行锁定,查看其对应的输出日志StdOut。

根据StdOut中的反馈信息,查看对应的作业执行图。

根据作业执行图中的Key信息,可以进而定位到导致数据倾斜的SQL代码片段。

使用示例如下。

根据使用经验总结,引起数据倾斜的主要原因有如下几类:

Join

GroupBy

Count(Distinct)

ROW_NUMBER(TopN)

动态分区

其中出现的频率排序为JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > 动态分区。

针对Join端产生的数据倾斜,会存在多种不同的情况,例如大表和小表Join、大表和中表Join、Join热值长尾。

大表Join小表。

数据倾斜示例。

如下示例中t1是一张大表,t2、t3是小表。

解决方案。

注意事项。

引用小表或子查询时,需要引用别名。

MapJoin支持小表为子查询。

MapJoin中多个小表用半角逗号(,)分隔,例如/*+ mapjoin(a,b,c)*/。

MapJoin在Map阶段会将指定表的数据全部加载在内存中,因此指定的表仅能为小表,且表被加载到内存后占用的总内存不得超过512 MB。由于MaxCompute是压缩存储,因此小表在被加载到内存后,数据大小会急剧膨胀。此处的512 MB是指加载到内存后的空间大小。可以通过如下参数设置加大内存,最大为8192 MB。

MapJoin中Join操作的限制。

left outer join的左表必须是大表。

right outer join的右表必须是大表。

不支持full outer join。

inner join的左表或右表均可以是大表。

MapJoin最多支持指定128张小表,否则报语法错误。

大表Join中表。

数据倾斜示例。

如下示例中t0为大表,t1为中表。

解决方案。

Join热值长尾。

数据倾斜示例

在下面这个表中,eleme_uid中存在很多热点数据,容易发生数据倾斜。

解决方案。

可以通过如下四种方法来解决。

序号

方案

说明

方案一

手动切分热值

将热点值分析出来后,从主表中过滤出热点值记录,先进行MapJoin,再将剩余非热点值记录进行MergeJoin,最后合并两部分的Join结果。

方案二

设置SkewJoin参数

方案三

SkewJoin Hint

使用Hint提示:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。SkewJoin Hint的方式相当于多了一次找倾斜Key的操作,会让Query运行时间加长;如果用户已经知道倾斜Key了,就可以通过设置SkewJoin参数的方式,能节省一些时间。

方案四

倍数表取模相等Join

利用倍数表。

手动切分热值。

将热点值分析出来后,从主表中过滤出热点值记录,先进行MapJoin,再将剩余非热点值记录进行MergeJoin,最后合并两部分的Join结果。具体可以参考如下代码示例:

设置SkewJoin参数。

使用示例如下:

SkewJoin Hint。

在SELECT语句中使用如下Hint提示:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/才会执行MapJoin,其中table_name为倾斜表名,column_name为倾斜列名,value为倾斜Key值。使用示例如下。

SkewJoin Hint方法直接指定值的处理效率比手动切分热值方法和设置SkewJoin参数方法(不指定值)高。

SkewJoin Hint支持的Join类型:

Inner Join可以对Join两侧表中的任意一侧进行Hint。

Left Join、Semi Join和Anti Join只可以Hint左侧表。

Right Join只可以Hint右侧表。

Full Join不支持Skew Join Hint。

建议只对一定会出现数据倾斜的Join添加Hint,因为Hint会运行一个Aggregate,存在一定代价。

被Hint的Join的Left Side Join Key的类型需要与Right Side Join Key的类型一致,否则SkewJoin Hint不生效。例如上例中的a.c0与b.c0的类型需要一致,a.c1与b.c1的类型需要一致。您可以通过在子查询中将Join Key进行Cast从而保持一致。示例如下:

SkewJoin Hint只支持对Join其中一侧进行Hint。

被Hint的Join一定要有left key = right key,不支持笛卡尔积Join。

MapJoin Hint的Join不能再添加SkewJoin Hint。

倍数表取模相等Join。

该方案和前三个方案的逻辑不同,不是分而治之的思路,而是利用一个倍数表,其值只有一列:int列,比如可以是从1到N(具体可根据倾斜程度确定),利用这个倍数表可以将用户行为表放大N倍,然后Join时使用用户ID和number两个关联键。这样原先只按照用户ID分发导致的数据倾斜就会由于加入了number关联条件而减少为原先的1/N。但是这样做也会导致数据膨胀N倍。

基于上面数据膨胀的情况,我们还可以将膨胀只局限作用于两表中的热点值记录,其他非热点值记录不变。先找到热点值记录,然后分别处理流量表和用户行为表,新增加一个eleme_uid_join列,如果用户ID是热点值,concat一个随机分配正整数(0到预定义的倍数之间,比如0~1000),如果不是则保持原用户ID不变。在两表Join时使用eleme_uid_join列。这样既起到了放大热点值倍数减小倾斜程度的作用,又减少了对非热点值无效的膨胀。不过可想而知的是这样的逻辑会将原先的业务逻辑SQL改得面目全非,因此不建议使用。

一个带GroupBy的伪代码示例如下。

当发生数据倾斜时,可以通过如下三种方案解决:

序号

方案

说明

方案一

设置Group By防倾斜的参数

方案二

添加随机数

把引起长尾的Key进行拆分。

方案三

创建滚存表

降本提效。

方案一:设置Group By防倾斜的参数。

方案二:添加随机数。

相对于方案一,此解决方案对SQL进行改写,添加随机数,把引起长尾的Key进行拆分是解决Group By长尾的一种比较好的方法。

对于SQL:Select Key,Count(*) As Cnt From TableName Group By Key;不考虑Combiner,M节点会Shuffle到R节点上,然后R节点再做Count操作,对应的执行计划是M->R。

假定已经找到了引起长尾的key,对长尾的Key再做一次工作再分配,就变成:

改完之后的执行计划变成了M->R->R,虽然执行步骤变长了,但是长尾的Key经过了2个步骤的处理,整体的时间消耗可能反而有所减少。资源消耗与耗时效果方面跟方案一基本持平,但实际场景中引发长尾的Key不止一个,再考虑寻找长尾Key和SQL改写的投入成本,方案一会更低一些。

创建滚存表。

核心降本提效,我们的核心诉求是取过去一年的商户数据,对于线上任务而言,每次都要读取T-1至T-365的所有分区其实是对资源的很大浪费,创建滚存表可以减少分区的读取但是又不影响过去一年的取数,示例如下。

首次初始化365天的商户营业数据(Group By汇总),标记数据更新日期,记为表a;后续线上任务切换为T-2日表a关联table_xxx_di表再Group By,这样每天读取的数据从365减少到了2个,主键shopid的重复性极大降低,对资源的消耗也会减少。

假如一个表数据分布如下。

ds(分区)

cnt(记录数)

20220416

73025514

20220415

2292806

20220417

2319160

使用下面的语句就容易发生数据倾斜:

解决方案如下:

序号

方案

说明

方案一

参数设置调优

方案二

通用两阶段聚合

在partition字段值拼接随机数。

方案三

类似两阶段聚合

先通过GroupBy两分组字段(ds+shop_id)再使用count(distinct)。

方案一:参数设置调优。

设置如下参数。

方案二:通用两阶段聚合。

若shop_id字段数据不均匀,则无法通过方案一优化,较通用的方式是在分区(partition)字段值中拼接随机数。

方案三:类似两阶段聚合。

如果GroupBy与Distinct的字段数据都均匀,则可以采用如下方式优化,先GroupBy两分组字段(ds和shop_id)再使用count(distinct)命令。

Top10的示例如下。

当发生数据倾斜时,可以通过以下几种方式解决:

序号

方案

说明

方案一

SQL写法的两阶段聚合。

增加随机列或拼接随机数,将其作为分区(Partition)中一参数。

方案二

UDAF写法的两阶段聚合。

最小堆的队列优先的通过UDAF的方式进行调优。

方案一:SQL写法的两阶段聚合。

为使Map阶段中Partition各分组数据尽可能均匀,增加随机列,将其作为Partition中一参数。

方案二:UDAF写法的两阶段聚合。

SQL方式会有较多代码,且可能不利于维护,此处将利用最小堆的队列优先的通过UDAF的方式进行调优,即在iterate阶段仅取TopN,merge阶段则均仅对N个元素融合,过程如下。

iterate:将前K个元素进行push,K之后的元素通过不断与最小顶比较交换堆中元素。

merge:将两堆merge后,原地返回前K个元素。

terminate:数组形式返回堆。

SQL中将数组拆为各行。

很多场景会建立动态分区的表,也容易发生数据倾斜。当发生数据倾斜时,可以通过下面的解决方案来解决。

序号

方案

说明

方案一

参数配置优化

通过参数配置进行优化。

方案二

裁剪优化

通过查找到存在记录数较多的分区裁剪后单独插入的方式解决。

方案一:参数配置优化。

动态分区可以把符合不同条件的数据放到不同的分区,避免需要通过多次Insert OverWrite写入到表中,特别是分区数比较多时,能够很好的简化代码,但是动态分区也有可能会带来小文件过多的困扰。

数据倾斜示例。

以如下最简SQL为例。

假设其有K个Map Instance,N个目标分区。

最极端的情况下,可能产生K*N个小文件,而过多的小文件会对文件系统造成巨大的管理压力,因此MaxCompute对动态分区的处理是引入额外的一级Reduce Task, 把相同的目标分区交由同一个(或少量几个) Reduce Instance来写入, 避免小文件过多,并且这个Reduce肯定是最后一个Reduce Task操作。在MaxCompute中默认开启此功能,也就是如下参数设置为True。

默认开启该功能,解决了小文件过多的问题,不会因为单个Instance产生文件数过多而导致任务出错,但也引入了新的问题:数据倾斜,并且额外引入一级Reduce操作也耗费计算资源,因此如何保持这两者的平衡,需要认真权衡。

解决方案。

对于上面一段代码如果使用默认参数,整个任务的运行时长约为1小时30分钟,其中最后一个Reduce的运行时长约为1小时20分钟,占到总运行时长的90%左右。由于引入额外的一个Reduce以后,使得每个Reduce Instance的数据分布特别不均匀,导致了数据长尾。

使用了动态分区

动态分区个数<=50

并且表根据最后一个Fuxi Instance的执行时长来判断该节点是否需要设置该参数的迫切程度,通过diag_level字段来标识别,规则如下:

Last_Fuxi_Inst_Time大于30分钟:Diag_Level=4('严重')。

Last_Fuxi_Inst_Time在20到30分钟之间:Diag_Level=3 ('高')。

Last_Fuxi_Inst_Time在10到20分钟之间:Diag_Level=2 ('中')。

Last_Fuxi_Inst_Time小于10分钟:Diag_Level=1('低')。

方案二:裁剪优化。

根据动态分区插数据时Map阶段就存在的数据倾斜问题,可通过查找到存在记录数较多的分区裁剪后单独插入的方式解决。基于案例实际情况可修改Map阶段的参数配置,如下所示:

由结果知,全过程进行了全表扫描,进一步优化,可通过关闭系统引入的Reduce Job优化,过程如下:

根据动态分区插数据时Map阶段就存在的数据倾斜问题,可通过查找到存在记录数较多的分区裁剪后单独插入的方式解决,具体步骤如下。

使用如下命令示例查询记录数较多的特定分区。

部分分区如下:

ds

hh

cnt

20200928

17

1052800

20191017

17

1041234

20210928

17

1034332

20190328

17

1000321

20210504

19

20191003

20

18

20200522

18

20220504

18

使用如下命令示例过滤上述分区插入后,再单独插入大记录数分区数据。

关注阿里云公众号或下载阿里云APP,关注云资讯,随时随地运维管控云服务

THE END
0.数据倾斜常见原因和解决办法数据倾斜的原因及解决办法数据倾斜常见原因和解决办法 数据倾斜在MapReduce编程模型中十分常见,多个节点并行计算,如果分配的不均,就会导致长尾问题(大部分节点都完成了任务,一直等待剩下的节点完成任务),本文梳理了常见的发生倾斜的原因以及相应的解决办法。 1.map端发生数据倾斜 产生原因:jvzquC41dnuh0lxfp0tfv8okcpmiwjnlkg5bt}neng5eg}fknu524;6668<3
1.2022年最强大数据面试宝典(全文50000字,强烈建议收藏)4. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: 某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。 热点现象出现的原因: HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以jvzquC41dnuh0ryrwd4og}4922776A71xkkxuyfeg/8:3::371
2.2022年最强大数据面试宝典(全文50000字,建议收藏)(四)4. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: 某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。 热点现象出现的原因: HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以jvzquC41fg|fnxugt0gmk‚zp0eun1jwvkerf1B5386:
3.HBase知识手册爱是与世界屏的技术博客Hbase和Hive在大数据架构中处在不同位置,Hbase主要解决实时数据查询问题,Hive主要解决海量数据处理和计算问题,一般是配合使用。 Hbase:Hadoop database 的简称,也就是基于Hadoop数据库,是一种NoSQL数据库,主要适用于海量明细数据(十亿、百亿)的随机实时查询,如日志明细、交易清单、轨迹行为等。 jvzquC41dnuh0>6evq4dqv4nqxkcg}ygtyusnm47;8?45B
4.大数据工程师面试题这一篇就够用了fsimage:记录的是数据块的位置信息、数据块的冗余信息(二进制文件) 由于edits 文件记录了最新状态信息,并且随着操作越多,edits 文件就会越大,把 edits 文件中最新的信息写到 fsimage 文件中就解决了 edits 文件数量多不方便管理的情况。 没有体现 HDFS 的最新状态。 jvzquC41yy}/lrfpuj{/exr1r1:3e<;cg37e7B
5.如何避免数据倾斜数据处理中数据倾斜和数据热点1、数据倾斜的表现 数据倾斜是由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点的现象。 主要表现:任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好jvzquC41dnuh0lxfp0tfv8|cfljlftillf5bt}neng5eg}fknu526<:9:374
6.分库分表方案中出现数据倾斜问题怎么解决分库分表数据倾斜二、 解决方案 1. 调整分片策略(治本之法) 2. 处理业务热点(治标之法) 3. 其他策略 三、 预防措施 总结 这是一个在分库分表实践中非常经典且棘手的问题。数据倾斜意味着数据并没有均匀地分布在不同数据库或表中,导致某些节点负载过高(存储、CPU、IO),而其他节点却非常空闲,从而成为系统瓶颈,严重影响整体性jvzquC41dnuh0lxfp0tfv8skgvgpl~s1ctzjeuj1fgzbkux137734=684
7.Hadoop·大数据技术栈·看云6.Hadoop解决数据倾斜方法 1、提前在 map 进行 combine,减少传输的数据量 2、导致数据倾斜的 key 加盐、提升 Reducer 并行度 … 7.Hadoop读文件和写文件流程 Hadoop读文件和写文件流程 HDFS-文件读写流程 8.Yarn的Job提交流程 步骤很多,理理清楚然后再有条理的进行回答。 jvzquC41yy}/mjsenq{e0ls1jcvq{ltfg/zpinyjgt5ckpicvc<03?<7746
8.37数据分布优化:如何应对数据倾斜?Redis核心技术与实战如使用配置更高的机器 只适用于只读的热点数据 解决方法 解决方法 解决方法 热点数据多副本 实例上存在热点数据 使用Hash Tag导致倾斜 Slot分配不均衡导致倾斜 bigkey导致倾斜 在构建切片集群时,尽量使用大小配置相同的实例,避免因实例资源不均衡而在不同实例上分配不同数量的Slot 应对方法 成因 成因 小建议 Redis jvzquC41vksf0pjgmdgoi7tti1ipn~rp1cxuklqg15695B8
9.举例说明Spark数据倾斜有哪些场景,对应的解决方案是什么?增加并行度,通过spark.sql.shuffle.partitions设置更高的 Shuffle 分区数,分散热点数据 [^1]。 场景三:Reduce Side Join 导致的倾斜 在没有合适优化手段的情况下,Join 操作只能在 Reduce 阶段完成,容易引发数据倾斜。 解决方法: 尝试将 Reduce Side Join 转换为 Map Side Join,前提是至少有一方数据量较小且可广jvzquC41ygtlw7hufp4og}4cpu}ft86jx|nv6?hy
10.阿里P8整理总结,入职大厂必备Java核心知识(附加面试题)ArrayList和LinkedList区别?HashMap内部数据结构?ConcurrentHashMap分段锁?jdk1.8中,对hashMap和concurrentHashMap做了哪些优化如何解决hash冲突的,以及如果冲突了,怎么在hash表中找到目标值synchronized 和 ReentranLock的区别?ThreadLocal?应用场景?Java GC机制?GC Roots有哪些?MySQL行锁是否会有死锁的情况?jvzquC41oconcr3ep1gsvrhng1jfvjnnAhoe?:<655955><(ghoe?|Tw|Q|yq@Gvec>Co95\mjG
11.新老手都值得看的Flink关键技术解析与优化实战上图为计算最小值的热点问题,红色数据为热点数据。如果直接将它们打到同一个分区,会出现性能问题。为了解决倾斜问题,我们通过 hash 策略将数据分成小的 partition 来计算,如上图的预计算,最后再将中间结果汇总计算。 当一切就绪后,我们来做增量的 UV 计算,比如计算 1 天 uv,每分钟输出 1 次结果。计算方式既可jvzquC41yy}/kwkqs0io1jwvkerf1\OTIQjDj{:PuHGGXPojZ
12.Redis数据库的数据倾斜详解Redis在服务端读数据访问Redis时,往往会对请求key进行分片计算,此时中会将请求打到某一台 Server 上,如果热点过于集中,热点 Key 的缓存过多,访问量超过 Server 极限时,就会出现缓存分片服务被打垮现象的产生。当缓存服务崩溃后,此时再有请求产生,就会打到DB 上,这也就是我们常说的缓存穿透,如果没有合理的解决,数据库jvzquC41yy}/lk:30pku1mfvcdgtg87;45;53}v0jvs
13.解决Redis数据倾斜热点等问题Redis单台机器的硬件配置有上限制约,一般我们会采用分布式架构将多台机器组成一个集群,这篇文章主要介绍了解决 Redis 数据倾斜、热点等问题,需要的朋友可以参考下+ 目录 GPT4.0+Midjourney绘画+国内大模型 会员永久免费使用!【 如果你想靠AI翻身,你先需要一个靠谱的工具!】 Redis 作为一门主流技术,应用场景非常多,很多jvzquC41yy}/lk:30pku1jwvkerf1;;;:9=/j}r
14.HBasehbase每秒最大写入多少5 热点现象( 数据倾斜) 怎么产生的, 以及解决方法有哪些 5.1热点现象   某个小的时段内, 对 HBase 的读写请求集中到极少数的 Region 上, 导致这些region 所在的 RegionServer 处理请求量骤增, 负载量明显偏大, 而其他的RgionServer 明显空闲。 jvzquC41dnuh0lxfp0tfv8vsa6676974:1gsvrhng1jfvjnnu173:;;677<
15.Hbase基本概念比如创建一张表,名为user,有两个列族,分别是userInfo和addressInfo,建表语句create 'user', 'userInfo', 'addressInfo' 3.Timestamp(时间戳):纪录每次操作数据的时间,通常作为数据的版本号 六. 热点现象(数据倾斜)怎么产生的,以及解决方法有哪些 热点现象: jvzquC41dnuh0lxfp0tfv8qkdcuxgw;2;1gsvrhng1jfvjnnu1738?<6987