数仓运维:数据倾斜问题解决思路

1. 什么是数据倾斜

  • 数据倾斜,一般是指 mapreduce 程序执行时,reduce 节点大部分执行完毕,但是有一个或者几个 reduce 节点运行很慢,导致整个程序的处理时间很长;

MapReduce 过程中,Reduce 阶段输入 ReduceTask 的数据流是<key, {value list}>形式,用户可以自定义 reduce()方法进行逻辑处理,最终以<key, value>的形式输出。

  • 简单来说数据倾斜就是数据的 key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面。

2. 数据倾斜有哪些表现

  • 数据倾斜会发生在数据开发的各个环节中,在不同引擎表现不一样。

2.1. Hadoop/Hive 99.99% 问题

  • 任务进度长时间维持在 99.99%,如果详细的看日志或者和监控界面的话,会发现:
    • 有一个多几个 reduce 卡住
    • 各种 container 报错 OOM
    • 读写的数据量极大,至少远远超过其它正常的 reduce
    • 伴随着数据倾斜,会出现任务被 kill 等各种诡异的表现

2.2. Spark OOM 问题

  • Spark 中的数据倾斜也很常见,Spark 中一个 stage 的执行时间受限于最后那个执行完的 task,因此运行缓慢的任务,会拖累整个程序的运行速度;
  • 过多的数据在同一个 task 中执行,将会把 executor 撑爆,造成 OOM,程序终止运行。
  • 使用 Window、GroupBy、Distinct 等聚合函数时,频繁出现反压,消费速度很慢,个别的task会出现OOM,调大资源也无济于事。

3. 那些场景会出现数据倾斜

3.1. 业务场景

  • 正常的数据分布理论上都是倾斜的,就是我们所说的’二八原理’:
    • 80% 的用户只使用 20% 的功能;
    • 20% 的用户贡献了 80% 的访问量;
    • 80% 的商品贡献了 20% 的销售业绩
  • 例如:
    • 在订单表中,北京和上海两个地区的订单数量,比其他地区高几个数量级,那么进行聚合的时候就会出现数据热点;
    • 在商品表中,少数顶流/主营品类的商品数量很多,而周边商品的种类则比较少,在统计的时候也会出现数据热点。

3.2. 技术场景

  • 大表 join 小表;
  • group by 维度过小,某个值的数量过多;
  • count、distinct 某个值过多;

4. 为什么出现数据倾斜

  • 在做数据运算的时候(如:countdistinctgroup byjoin),会触发 Shuffle 动作,一旦触发所有相同 key 的值,就会拉到一个或几个节点上,发生单点过热问题。

  • MapReduce 数据倾斜,是因为某一个 key 的数据条数比其他 key 多很多(有时是百倍或者千倍之多),这条 key 所在的 reduce 节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完;

Group by 倾斜

  • group by 引起的倾斜主要是输入数据行按照 「group by 列分布不均匀」 引起的。
  • 比如,假设按照供应商对销售明细事实表来统计订单数,那么部分大供应商的订单量显然非常多,而多数供应商的订单量就一般,由于 group by 的时候是按照供应商的 ID 分发到每个 Reduce Task ,那么此时分配到大供应商的 Reduce Task 就分配了更多的订单,从而导致数据倾斜。

Join 倾斜

  • join 造成的倾斜,常见情况是不能做 map join 的两个表(能做map join的话基本上可以避免倾斜),其中一个是行为表,另一个应该是属性表。

  • 比如,我们有两个表:
    • 一个用户属性表 users,还有一个用户对商品的操作行为表日志表 logs。
  • 现在需要将行为表关联用户表:
    • select * from logs a join users b on a.user_id = b.user_id;
  • 其中 logs 表里面会有一个特殊用户 user_id = 0,代表未登录用户,假如这种用户占了相当的比例,那么个别 reduce 会收到比其他 reduce 多得多的数据,因为它要接收所有 user_id = 0 的记录进行处理,使得其处理效果会非常差,其他 reduce 都跑完很久了它还在运行。

4.3. 连接 key 设计缺陷

  • 示例:
    • 对于商品 item_id 的编码,除了本身的 id 序列,还人为的把 item 的类型、也作为编码放在最后两位;
      • 例如:类型 1(电子产品)的编码是 01,类型 2(家居产品)的编码是 02,
    • 由于类型 1 是主要商品类,将会造成以 01 为结尾的商品整体倾斜, 这时,如果 reduce 的数量恰好是 100 的整数倍,会造成 partitioner 把 01 结尾的 item_id 都 hash 到同一个 reducer,从而引爆问题。

4.4. 连接 key 为 NULL 空值

  • 示例:
    • 如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的 user_id 关联,会碰到数据倾斜的问题。

4.5. 连接 key 为不同数据类型

  • 示例:
    • 用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型;
    • 当按照 user_id 进行两个表的 Join 操作时,默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配到一个 Reducer 中。

5. 如何解决数据倾斜

5.1. 解决原理

  • Hive 的执行是分阶段的,map 处理数据量的差异取决于上一个 stage 的 reduce 输出,所以如何将数据均匀的分配到各个 reduce 中,就是解决数据倾斜的根本所在。

  • 解决数据倾斜的几个思路:

    • 业务上:避免热点key的设计或者打散热点key,例如可以把北京和上海分成地区,然后单独汇总。

    • 技术上:在热点出现时,需要调整方案避免直接进行聚合,可以借助框架本身的能力,例如进行mapside-join。

    • 参数上:无论是Hadoop、Spark还是Flink都提供了大量的参数可以调整。

5.2. 事前对连接 key 进行预处理

  • 大多的数据倾斜都是跟业务无关的数据导致的,在读表之后第一件事就是尽可能的过滤不必要的数据,理清业务含义先过滤脏数据和业务不相关的数据。

  • 过滤 NULL 值:
    • 两个表连接 key 存在大量的 NULL 数据没有过滤,参与进了 join 的执行。
    • 其中分为主动产生与非主动产生的:
      • 主动产生的空值与无效值,是值本来连接字段就存在大量空值与无效值。
      • 另一种是非主动产生的空值或无效值,比如之前的逻辑为了合并数据,用了 union all 操作把某些列设置为 null,而后面的逻辑采用该列作为 join 条件导致的数据倾斜。
  • 过滤 “ 脏数据 ”:
    • 两个表连接条件的字段数据类型不一致,不满足原有的数据类型,经过内在的逻辑处理,往往会得到与上边 NULL 相同的结果,因此在连接前,需要把连接 key 统一好数据类型。
  • 事前对连接 key 进行预处理最好的办法是,写 SQL 操作之前,先简单看下表的数据量级,再看 key 的空值与无效值的统计,表与表之间的关联 key,先增加空值与无效值的过滤处理;

  • 预处理方法:
    • 直接去掉空值、异常值;
    • 在大表关联大表出现的异常值使用 rand 函数进行随机填充。

5.3. map join

  • MapJoin 介绍
    • MapJoin,是在 Map 阶段进行表之间的 join 连接,而不需要进入到 Reduce 阶段才进行 join 连接,这样就节省了在 Shuffle 阶段时要进行的大量数据传输,从而起到了优化作业的作用;
    • MapJoin 的 Join 操作在 Map 阶段完成,如果需要的数据在 Map 的过程中可以访问到则不再需要 Reduce。
    • MapJoin 把小表全部加载到内存,在 map 端进行join,避免 reducer 处理。
    • 用法示例:
      • select /*+ mapjoin(t)*/ f.a,f.b from A f join B t on f.a=t.a;
  • MapJoin 原理
    • 通常情况下,要连接的各个表里面的数据,会分布在不同的 Map 中进行处理,即同一个 Key 对应的 Value 可能存在不同的 Map 中,这样就必须等到 Reduce 中去连接。
    • 要使 MapJoin 能够顺利进行,那就必须满足这样的条件:除了大表的数据分布在不同的 Map 中外,小表的数据必须在每个 Map 中有完整的拷贝。

  • MapJoin分为两个阶段:
    • 通过 MapReduce Local Task,将小表读入内存,生成 HashTableFiles 上传至 Distributed Cache 中,这里会 HashTableFiles 进行压缩;
    • MapReduce Job 在 Map 阶段,每个 MapperDistributed Cache 读取 HashTableFiles 到内存中,顺序扫描大表,在 Map 阶段直接进行 Join,将数据传递给下一个MapReduce 任务。
  • 示例:

      select c.channel_name,count(t.requesturl) PV
       from ods.cms_channel c
       join
       (select host,requesturl from  dms.tracklog_5min where day='20151111' ) t
       on c.channel_name=t.host
       group by c.channel_name
       order by c.channel_name;
    
    • 上以为小表 join 大表的操作,可以使用 mapjoin 把小表放到内存中处理,语法很简单只需要增加 /*+ MAPJOIN(pt) */,把需要分发的表放入到内存中。
      select /*+ MAPJOIN(c) */
      c.channel_name,count(t.requesturl) PV
       from ods.cms_channel c
       join
       (select host,requesturl from  dms.tracklog_5min where day='20151111' ) t
       on c.channel_name=t.host
       group by c.channel_name
       order by c.channel_name;
    
  • mapjoin 参数
    • hive.auto.convert.join
      • 如果是小表,自动选择 Mapjoin;
      • 小表,默认25m的表是小表;
      • 该参数为 true 时,Hive 自动对左边的表统计量,如果是小表就加入内存,即对小表使用Map join;
      • 修改方法:
        • set hive.auto.convert.join = true; # 默认为false
    • hive.mapjoin.smalltable.filesize
      • 大表小表的阀值;
      • 默认值是 25mb,即:hive.mapjoin.smalltable.filesize=25000000
      • 修改方法:
        • set hive.mapjoin.smalltable.filesize=xxxx;
    • hive.mapjoin.followby.gby.localtask.max.memory.usage
      • 规定 map join 做group by 操作时,可以使用多大的内存来存储数据;
      • 如果数据太大,则不会保存在内存里
      • 默认值:0.55;
      • 修改方法:
        • set hive.mapjoin.followby.gby.localtask.max.memory.usage;
    • ` hive.mapjoin.localtask.max.memory.usage`
      • 本地任务可以使用内存的百分比
      • 默认值: 0.90;
      • 修改方法:
        • set hive.mapjoin.localtask.max.memory.usage;
  • 注意:使用 mapjoin 时,一次性加载到内存中的表最多是 8 张;
    • 如果超过 8 张小表,应该嵌套一层子循环,将多余的表在外层中写入 mapjion 里面;
  • 对于 join,在判断小表不大于 1G 的情况下,使用 map join;

  • MapJoin 缺陷:
    • 阈值问题,导致该方案并不够通用。
  • skew join
    • 针对前面说的 Join 倾斜问题,skew join 方案 user_id = 0 的特殊值先不在 reduce 端计算,而是先写入 hdfs,然后启动一轮 map join 专门做这个特殊值的计算,以能提高计算这部分值的处理速度。
    • skewjoin 参数:
      • hive.optimize.skewjoin
        • 告知 hive 这个 join 是 个 skew join;
        • 设置方法:
          • set hive.optimize.skewjoin=true;
      • hive.skewjoin.key
        • 告诉 hive 如何判断特殊值,
        • 根据 hive.skewjoin.key 设置的数量 hive 可以知道,比如默认值是 100000,那么超过 100000 条记录的值就是特殊值。
        • 设置方法:
          • set hive.skewjoin.key=100000;

5.5. 解决 Group By 导致的数据倾斜

  • 对于 group by 或 distinct,启用 skewindata;
  • hive.groupby.skewindata
    • 有数据倾斜的时候进行负载均衡(默认是 false);
    • 设置方法:
      • set hive.groupby.skewindata=true
  • hive.map.aggr
    • 是否在 Map 端进行聚合,默认为True;
    • 设置方法:
      • set hive.map.aggr = true
  • hive.groupby.mapaggr.checkinterval
    • 配置在 Map 端进行聚合操作的条目数目;
    • 设置方法:
      • set hive.groupby.mapaggr.checkinterval = 100000
  • 这个方案的核心实现思路就是进行两阶段聚合,Hive 在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个 MapReduce Job。
    • 第一个 MapReduce Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个Reduce 做部分聚合操作并输出结果。这样处理的结果是相同的 GroupBy Key 有可能被分布到不同的 Reduce 中,从而达到负载均衡的目的;
    • 第二个 MapReduce Job 再根据预处理的数据结果,按照 GroupBy Key 分布到 Reduce 中(这过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

5.5. 特殊值分开处理

  • 例如:表 a 跟表 b 进行 join 操作,有数据倾斜的表是表a ,表 a 里边的倾斜的头部 key 非常少,只有 key = 1 的数据特别多,可以将 a 的数据分为两部分 a1 跟 a2,a1 中只包含 key = 1 的倾斜数据,a2 不包含数据倾斜的数据。
  • 采取 union all + mapjoin 的方式分而治之,把 key = a 单独抽出来作为一组用 mapjoin ,剩下的另起炉灶直接用 join,最后再把各自计算得出来的结果用 union all 拼接起来。

  • SQL 示例:

      select
          *
      from
          (
              select * from logs where user_id = 0
          )
          a
      join
          (
              select * from users where user_id = 0
          )
          b
      on
          a.user_id = b.user_id
        
      union all
        
      select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
    

5.6. 随机数分配法

  • 数据倾斜的表头部 key 都很多,而且都是业务需要的 key,既不能过滤、也不好单拆开计算,可以考虑在头部 key 添加随机数进行打散,处理后再去掉随机数合并。
  • 最常见的倾斜,是因为数据分布本身就具有长尾性质,比如我们将日志表和商品表关联:
      select * from logs a join items b on a.item_id = b.item_id;
    
  • 这个时候,分配到热门商品的 reducer 就会很慢,因为热门商品的行为日志肯定是最多的,而且我们也很难像上面处理特殊 user 那样去处理 item。这个时候就会用到加随机数的方法,也就是在join的时候增加一个随机数,随机数的取值范围n相当于将 item 给分散到 n 个 reducer。
  • SQL 示例:
      select
          a.*,
          b.*
      from
          (
              select *, cast(rand() * 10 as int) as r_id from logs
          )
          a
      join
          (
              select *, r_id from items lateral view explode(range_list(1, 10)) rl as r_id
          )
          b
      on
          a.item_id = b.item_id
          and a.r_id = b.r_id
    
  • 上面的 SQL,对行为表的每条记录生成一个 1-10 的随机整数,对于 item 属性表,每个 item 生成 10 条记录,随机 key 分别也是 1-10,这样就能保证行为表关联上属性表。
  • 其中 range_list(1,10) 代表用 udf 实现的一个返回 1-10 整数序列的方法。
  • 这个做法是一个解决 join 倾斜比较根本性的通用思路,就是如何用随机数将 key 进行分散,具体可以根据具体的业务场景做实现上的简化或变化。

5.7. 解决小表过大无法 map join 问题

  • 当小表数据过大(比如大于 1G),或者小表条数过多(比如数百万行),就不再适用于之前的 map join 方案;
  • 如果小表很大,map join 会出现 bug 或异常,这时就需要特别的处理;
  • 例如:
    • users 表有 600w+ 的记录,需要和 log 数据进行 join,把 users 分发到所有的 map 上是个不小的开销,而且 map join 不支持这么大的小表。
    • 如果用普通的 join,又会碰到数据倾斜的问题,因为 log 里 user_id 有上百万个,这就又回到原来 map join 问题。
  • 解决:
    • 思路:
      • 由于每天的会员 uv 不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多,所以可以先抽取 log 中出现的 distinct user_id;
      • 之后将 log 有出现的 distinct user_id,再与 log 进行 join;
      • 这个思路,能解决很多场景下的数据倾斜问题。
    • SQL 示例:
        select
            /*+mapjoin(x)*/
            *
        from
            log a
        left outer join
            (
                select
                    /*+mapjoin(c)*/
                    d.*
                from
                    (
                        select distinct user_id from log
                    )
                    c
                join users d
                on
                    c.user_id = d.user_id
            )
            x on a.user_id = b.user_id;
      

5.8. 处理连接 key 为 NULL 空值

  • 解决方法:
    • 赋与空值分新的 key 值;
  • SQL 示例:
    select
    	*
    from
    	log a
    left outer join users b
    on
    	case
    		when a.user_id is null
    		then concat(‘hive’, rand())
    		else a.user_id
    	end = b.user_id;
    

5.9. 处理连接 key 为不同数据类型

  • 解决方法:
    • 把不同数据类型转换为同一类型,数字类型转换成字符串类型
  • SQL 示例:
      select
          *
      from
          users a
      left outer join logs b
      on
          a.usr_id = cast(b.user_id as string)
    

5.10. 提高 Reduce 任务的并行度

  • 将 Reduce Task 的数量变多,就可以让每个Reduce Task分配到更少的数据量,这样可以缓解数据倾斜的问题。
  • 在调用groupByKey、countByKey、reduceByKey时,传入Reduce端的并行度参数,这样在进行Shuffle操作时,会创建指定的Reduce Task,可以让每个Reduce Task分配到更少量的数据,避免了Spark作业时OOM的情况。

5.11. 其他方法

  • 压缩文件

  • Spark 参数
    • 使用map join 代替reduce join
    • 提高shuffle并行度
  • Flink 参数
    • MiniBatch设置
    • 并行度设置