优化

HQL语法优化之分组聚合优化

优化说明

  • Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。

  • Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率。

  • map-side 聚合相关的参数如下

    --启用map-side聚合
    set hive.map.aggr=true;
    
    --用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。
    set hive.map.aggr.hash.min.reduction=0.5;
    
    --用于检测源表是否适合map-side聚合的条数。
    set hive.groupby.mapaggr.checkinterval=100000;
    
    --map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
    set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
    

HQL语法优化之Join优化

  • Join算法概述
  • Hive拥有多种join算法,包括Common Join,Map Join,Bucket Map Join,Sort Merge Buckt Map Join等,下面对每种join算法做简要说明:
    • Common Join
    • Common Join是Hive中最稳定的join算法,其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
    • Map Join
    • Map Join算法可以通过两个只有map阶段的Job完成一个join操作。其适用场景为大表join小表。若某join操作满足要求,则第一个Job会读取小表数据,将其制作为hash table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。
    • Bucket Map Join
    • Bucket Map Join是对Map Join算法的改进,其打破了Map Join只适用于大表join小表的限制,可用于大表join大表的场景
    • Bucket Map Join的核心思想是:若能保证参与join的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join操作了。这样一来,第二个Job的Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可
    • Sort Merge Bucket Map Join
    • Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。SMB Map Join要求,参与join的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
    • Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单,就是对参与join的一张表构建hash table,然后扫描另外一张表,然后进行逐行匹配。
    • Hive中的SMB Map Join就是对两个分桶的数据按照上述思路进行Join操作。可以看出,SMB Map Join与Bucket Map Join相比,在进行Join操作时,Map端是无需对整个Bucket构建hash table,也无需在Map端缓存整个Bucket数据的,每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可。

优化说明

  • Map Join有两种触发方式,一种是用户在SQL语句中增加hint提示,另外一种是Hive优化器根据参与join表的数据量大小,自动触发。

    1. Hint提示

      • 用户可通过如下方式,指定通过map join算法,并且ta将作为map join中的小表。这种方式已经过时,不推荐使用。
    2. 自动触发

      • Hive在编译SQL语句阶段,起初所有的join操作均采用Common Join算法实现。
      • 之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务,若满足要求,便将Common Join任务自动转换为Map Join任务。
      • 但有些Common Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行join操作),所以这种Common Join任务是否能转换成Map Join任务在编译阶是无法确定的。
      • 针对这种情况,Hive会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的Map Join任务以及原有的Common Join任务。最终具体采用哪个计划,是在运行时决定的。
    3. 参数

    --启动Map Join自动转换
    set hive.auto.convert.join=true;
    
    --一个Common Join operator转为Map Join operator的判断条件,若该Common Join相关的表中,存在n-1张表的已知大小总和<=该值,则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,实际运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。
    set hive.mapjoin.smalltable.filesize=250000;
    
    --开启无条件转Map Join
    set hive.auto.convert.join.noconditionaltask=true;
    
    --无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中,存在n-1张表的大小总和<=该值,此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。而是只生成一个最优的Map Join计划。
    set hive.auto.convert.join.noconditionaltask.size=10000000;
    

HQL语法优化之数据倾斜

  • 数据倾斜概述
    • 数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。
    • Hive中的数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍在上述两种场景下的优化思路。

分组聚合导致的数据倾斜

  • 如果group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
  • 由分组聚合导致的数据倾斜问题,有以下两种解决思路:
1.Map-Side聚合
  • 开启Map-Side聚合后,数据会现在Map端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map-端聚合能完全屏蔽数据倾斜问题。
2. Skew-GroupBy优化
  • Skew-GroupBy的原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合,第二个MR按照分组字段分区,完成最终聚合。
--启用分组聚合数据倾斜优化
set hive.groupby.skewindata=true;

Join导致的数据倾斜

  • 如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
  • 由join导致的数据倾斜问题,有如下三种解决方案:
map join
  • 使用map join算法,join操作仅在map端就能完成,没有shuffle操作,没有reduce阶段,自然不会产生reduce端的数据倾斜。该方案适用于大表join小表时发生数据倾斜的场景。
skew join
  • skew join的原理是,为倾斜的大key单独启动一个map join任务进行计算,其余key进行正常的common join。
--启用skew join优化
set hive.optimize.skewjoin=true;
--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;

  • 这种方案对参与join的源表大小没有要求,但是对两表中倾斜的key的数据量有要求,要求一张表中的倾斜key的数据量比较小(方便走mapjoin)。
调整SQL语句
  • 若参与join的两表均为大表,其中一张表的数据是倾斜的,此时也可通过以下方式对SQL语句进行相应的调整。
select
    *
from(
    select --打散操作
        concat(id,'_',cast(rand()*2 as int)) id,
        value
    from A
)ta
join(
    select --扩容操作
        concat(id,'_',0) id,
        value
    from B
    union all
    select
        concat(id,'_',1) id,
        value
    from B
)tb
on ta.id=tb.id;

HQL语法优化之任务并行度

  • 优化说明

    • 对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。Hive的计算任务由MapReduce完成,故并行度的调整需要分为Map端和Reduce端。

Map端并行度

  • Map端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。
  • 以下特殊情况可考虑调整map端并行度:
  1. 查询的表中存在大量小文件

    • 按照Hadoop默认的切片策略,一个小文件会单独启动一个map task负责计算。若查询的表中存在大量小文件,则会启动大量map task,造成计算资源的浪费。这种情况下,可以使用Hive提供的CombineHiveInputFormat,多个小文件合并为一个切片,从而控制map task个数。相关参数如下:

      set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
      
  2. map端有复杂的查询逻辑

    • 若SQL语句中有正则替换、json解析等复杂耗时的查询逻辑时,map端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大map端的并行度,令map task多一些,每个map task计算的数据少一些。相关参数如下:

      --一个切片的最大值
      set mapreduce.input.fileinputformat.split.maxsize=256000000;
      

Reduce端并行度

  • Reduce端的并行度,也就是Reduce个数。相对来说,更需要关注。Reduce端的并行度,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。

    --指定Reduce端并行度,默认值为-1,表示用户未指定
    set mapreduce.job.reduces;
    --Reduce端并行度最大值
    set hive.exec.reducers.max;
    --单个Reduce Task计算的数据量,用于估算Reduce并行度
    set hive.exec.reducers.bytes.per.reducer;
    
  • 若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive自行估算Reduce并行度,估算逻辑如下:

  • 假设Job输入的文件大小为totalInputBytes

  • 参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer。

  • 参数hive.exec.reducers.max的值为maxReducers。

  • 则Reduce端的并行度为:

    $min(ceil(\frac {totalInputBytes} {bytesPerReducer}),maxReducers)$

  • 根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了

HQL语法优化之小文件合并

优化说明

  • 小文件合并优化,分为两个方面,分别是Map端输入的小文件合并,和Reduce端输出的小文件合并。

Map端输入文件合并

  • 合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。目的是防止为单个小文件启动一个Map Task,浪费计算资源。

  • 相关参数为:

    --可将多个小文件切片,合并为一个切片,进而由一个map任务处理
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 
    
  • Reduce输出文件合并

    • 合并Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
    --开启合并map only任务输出的小文件
    set hive.merge.mapfiles=true;
    
    --开启合并map reduce任务输出的小文件
    set hive.merge.mapredfiles=true;
    
    --合并后的文件大小
    set hive.merge.size.per.task=256000000;
    
    --触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
    set hive.merge.smallfiles.avgsize=16000000;
    
    

其他优化

CBO优化

优化说明

  • CBO是指Cost based Optimizer,即基于计算成本的优化。
  • 在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在hive的MR引擎下主要用于join的优化,例如多表join的join顺序。
--是否启用cbo优化 
set hive.cbo.enable=true;

谓词下推

优化说明

  • 谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。
--是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;

矢量化查询

  • 矢量化计算是一种并行计算方法,它将重复的程序指令编译成单个矢量(多个数据集的组合),然后同时执行并最大化计算机速度

  • 矢量化计算可以显著提高程序的性能,因为它允许处理器同时处理多个数据元素,而不是逐个处理。

  • 相关参数如下:

    set hive.vectorized.execution.enabled=true;
    

Fetch抓取

  • Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。
--是否在特定场景转换为fetch 任务
--设置为none表示不转换
--设置为minimal表示支持select *,分区字段过滤,Limit等
--设置为more表示支持select 任意字段,包括函数,过滤,和limit等
set hive.fetch.task.conversion=more;

本地模式

  • Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
--开启自动转换为本地模式
set hive.exec.mode.local.auto=true;  

--设置local MapReduce的最大输入数据量,当输入数据量小于这个值时采用local  MapReduce的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;

--设置local MapReduce的最大输入文件个数,当输入文件个数小于这个值时采用local MapReduce的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;

--关闭本地模式
set hive.exec.mode.local.auto=false;

--开启本地模式
set hive.exec.mode.local.auto=true;

并行执行

  • Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。
  • 但是某SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。
  • 相关参数如下:
--启用并行执行优化
set hive.exec.parallel=true;       
    
--同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=8; 

严格模式

  • Hive可以通过设置某些参数防止危险操作:
  1. 分区表不使用分区过滤
    • 将hive.strict.checks.no.partition.filter设置为true时,对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。
  2. 使用order by没有limit过滤
    • hive.strict.checks.orderby.no.limit设置为true时,对于使用了order by语句的查询,要求必须使用limit语句。
  3. 笛卡尔积
    • 将hive.strict.checks.cartesian.product设置为true时,会限制笛卡尔积的查询。