首先回顾一下MapReduce的流程
MapReduce流程简述
输入分片: MapReduce 作业开始时,输入数据被分割成多个分片,每个分片大小一般在 16MB 到 128MB 之间。这些分片会被分配给不同的 Map 任务进行处理。
Map 阶段: Map 阶段的任务是处理输入分片,并为每个分片生成一个或多个键值对(key/value pair)。Map 函数由用户编写,以处理具体的业务逻辑。
Shuffle 阶段:在 Map 阶段完成后,系统会自动进行 Shuffle 阶段。在这个阶段,系统会根据 Map 阶段生成的键值对中的键(key),将值(value)进行重新排序和分组,同一个键对应的值会被组合在一起。
Reduce 阶段:Shuffle 阶段后是 Reduce 阶段。在此阶段,Reduce 函数(也由用户编写)会处理 Shuffle 阶段生成的每个键和对应的值的集合,并生成一组输出结果。
输出:最后,MapReduce 作业的输出会被写入到文件系统中,一般是分布式文件系统,如 Hadoop 的 HDFS。这样,输出结果就可以被其他 MapReduce 作业或其他系统使用。
Map主要功能从磁盘读数据到内存。两个主要过程:1)每个输入分片让一个instance处理,默认256MB,输出暂存环形内存缓冲区;2)写磁盘之前,根据reduce
instance数量分区,也会做部分聚合工作,减少输入reduce的数据量。
a) Map倾斜
i) 上游表文件的大小不均匀,并且小文件特别多。可以上游合并小文件,或调节参数:“set odps.sql. mapper.merge.limit.size=64”用于调节Map Instance 的个数;“set odps.sql.mapper.split.size=256” 用于调节单个Map Instance 读取的小文件个数。
ii) 某些Map Instance读取文件的某个值过多,主要是指 Count Distinct 操作。可以设置 “distribute by rand()”将 Map 端分发后的数据重新按照随机值再进行分发。
Join参与整个Map和Reduce阶段。
b) Join倾斜
i) Join的某路输入比较小,可以采用 MapJoin,避免分发引起长尾。
ii) Join 的每路输入都较大,且长尾是空值导致的,可以将空值处理成随机值,避免聚集。
iii) Join 的每路输入都较大,且长尾是热点值导致的,可以对热点值 和非热点值分别进行处理,再合并数据。
Reduce段负责对Map梳理后的有序键值对聚合操作,长尾原因是key数据分布不均匀
c) Reduce倾斜
i) 同一个表按维度对不同列count distinct,使Map端数据膨胀;
ii) Map直接聚合时Key不均匀,导致Reduce端长尾:对热点key单独处理;
iii) 动态分区过多使小文件过多,数据分发多次:动态分区可将符合不同条件的数据放不同分区,避免多次写表,这可能产生大量小文件,可引入额外一级reduce task,相同目标分区交友同一个reduce instance写入。
iv) 多个Distinct 同时出现在一段代码中,数据多次分发膨胀 N 倍,还会把长尾现象放大 N 倍。应避免在同一段SQL代码中多次使用Distinct,可以将Distinct移到子查询中或者使用其他去重方式,例如使用GROUP BY。注意不同表join一定保证指标粒度是原始表数据粒度,代码臃肿时可落子查询。
文章评论