MapReduce

MapReduce 源码解析

MapTask 工作机制

  1. Read阶段:MapTask 通过 InputFormat 获得的 RecordReader,丛书 InputSplit 中解析出一个个 kye/value
  2. Map阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value
  3. Collect 收集阶段:在用户编写 map() 函数中,当数据处理完成后,一般会调用 OutputCollection.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 partitioner),并写入一个 环形内存缓冲区中
  4. Spill阶段:即"溢写",当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。
    1. 溢写阶段详情
      1. 利用快速排序算法对缓存区内的数据进行排序,排序方法是:先按照分区编号 Partition 进行排序,然后 按照 key 进行排序。
      2. 按照分区编号 由小到大 依次将每个分区中的数据写入任务目录下的临时文件。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作
      3. 将分区数据的元信息 写到 内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的便宜量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引 写到文件 output/spillN.out.index 中
    2. Merge阶段
      • 当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
      • 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
      • 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

Reduce 工作机制

  1. Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  3. Reduce阶段:reduce()函数将计算结果写到HDFS上。

ReduceTask 并行度决定机制

  1. 设置 ReduceTask 并行度(个数)

    • ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
    // 默认值是1,手动设置为4
    job.setNumReduceTask(4);
    
  2. 注意事项

    1. ReduceTask = 0,表示没有 Reduce阶段,输出文件个数和 Map 个数一致
    2. ReduceTask默认值是1,所以输出文件个数为一个
    3. 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
    4. ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能由1个ReduceTask
    5. 具体多少个ReduceTask,需要根据集群性能而定

Join 应用

Reduce Join

  • Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出
  • Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成。只需要记录在每一个分组当中将那些来源不同文件的记录分开,最后进行合并就ok了。

案例1

  1. 创建商品和订单合并后的 TableBean 类
public class TableBean implements Writable {

    private String id; //订单 id
    private String pid; //产品 id
    private int amount; // 产品数量
    private String pname; // 产品名称
    private String flag; //判断 order 表 还是 pd表的标志字段

    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }
}

  1. 编写 TableMapper 类
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String filename;
    private Text outK = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException,
            InterruptedException {
        // 获取对应文件名称
        InputSplit split = context.getInputSplit();
        FileSplit fileSplit = (FileSplit) split;
        filename = fileSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
        String line = value.toString();

        //判断是哪个文件,然后针对文件进行不同的操作
        if (filename.contains("order")) {
            String[] split = line.split("\t");
            //封装 outK
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
        } else {
            // 商品表
            String[] split = line.split("\t");
            // 封装 outK
            outK.set(split[0]);
            // 封装 outV
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        context.write(outK, outV);
    }
}
  1. 编写 TableReducer 类
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values,
                          Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException,
            InterruptedException {
        ArrayList<TableBean> orderBeans = new ArrayList<>();

        TableBean pdBean = new TableBean();
        for (TableBean value : values) {
            //判断数据来自哪个表
            if("order".equals(value.getFlag())){
                //防止内存重定向
                //创建一个 TableBean 对象 接收 value
                TableBean tmpOrderBean = new TableBean();

                try{
                    BeanUtils.copyProperties(tmpOrderBean,value);
                }catch (IllegalAccessException e){
                    e.printStackTrace();
                } catch (InvocationTargetException e){
                    e.printStackTrace();
                }

                //将临时 TableBean 对象添加到集合 orderBeans
                orderBeans.add(tmpOrderBean);
            } else {
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e){
                    e.printStackTrace();
                } catch (InvocationTargetException e){
                    e.printStackTrace();
                }
            }
        }

        for(TableBean orderBean:orderBeans){
            orderBean.setPname(pdBean.getPname());

            // 写出修改后的 orderBean 对象
            context.write(orderBean,NullWritable.get());
        }
    }
}
  1. 编写 TableDriver 类
public class TableDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\inputtable2"));
        FileInputFormat.addInputPath(job, new Path("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\tablecache"));

        File file = new File("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\sry\\");
        if(file.exists()){
            FileUtils.deleteQuietly(file);
        }
        FileOutputFormat.setOutputPath(job,new Path("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\sry\\"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

总结

  • 合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce 阶段极易产生数据倾斜
  • 解决方案:Map 端实现数据合并

Map Join

使用场景

  • Map Join 适用于一张表十分小,一张表很大的场景

优点

  • 在Map端缓存多张表,提前处理业务逻辑,增加Map端业务,减少Reduce端数据的压力,尽可能减少数据倾斜

具体办法

  • 采用DistributedCache
  • 在Mapper的setup阶段,将文件读取到缓存集合中
  • 在Driver驱动类中加载缓存

案例2

  1. 在 MapJoinDriver 驱动类中添加缓存文件
public class TableDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\inputtable2"));

        //加载缓存数据

        job.addCacheFile(new URI("file:///D:/尚硅谷/Hadoop/资料/资料/11_input/tablecache/pd.txt"));
        job.setNumReduceTasks(0);

        File file = new File("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\sry\\");
        if (file.exists()) {
            FileUtils.deleteQuietly(file);
        }
        FileOutputFormat.setOutputPath(job, new Path("D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\sry\\"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}
  1. 在 MapJoinMapper 类中的 setup 方法中读取缓存文件
public class TableMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> pdMap = new HashMap<>();

    private Text text = new Text();


    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException,
            InterruptedException {
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }

        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\t");

        String pname = pdMap.get(fields[1]);

        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        context.write(text, NullWritable.get());

    }
}

ETL

  • ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从 来源端经过抽取 (Extract)、转换(Transform)、加载(Load)至目的端的过程

案例3

  1. 编写 WebLogMapper 类
public class WeblogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 1 获取 1 行数据
        String line = value.toString();

        // 2 解析日期
        boolean res = parseLog(line, context);

        // 3 日志不合法退出
        if (!res) {
            return;
        }
        context.write(value, NullWritable.get());
    }

    private boolean parseLog(String line, Context context) {
        // 1 获取
        String[] fields = line.split(" ");

        // 2 日志长度大于 11 的为合法
        if (fields.length > 11) {
            return true;
        } else {
            return false;
        }
    }
}
  1. 编写 WebLogDriver 类
public class WebLogDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        args = new String[]{"D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\inputlog", "D:\\尚硅谷\\Hadoop\\资料\\资料\\11_input\\inputlog\\out1"};

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);

        job.setMapperClass(WeblogMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

MapReduce 开发总结

输入数据接口:InputFormat

  1. 默认使用的实现类是:TextInputFormat
  2. TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
  3. CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

逻辑处理接口:Mapper

  1. 用户根据业务需求实现其中三个方法:map() setup() cleanup ()

Partitioner 分区

  1. 有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;
    • key.hashCode()&Integer.MAXVALUE % numReduces
  2. 如果业务上有特别的需求,可以自定义分区。

Comparable 排序

  1. 当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
  2. 部分排序:对最终输出的每一个文件进行内部排序。
  3. 全排序:对所有数据进行排序,通常只有一个Reduce。
  4. 二次排序:排序的条件有两个。

Combiner 合并

  • Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

逻辑处理接口:Reducer

  • 用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

输出数据接口:OutputFormat

  • 默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
  • 用户还可以自定义OutputFormat。

Hadoop 数据压缩

概述

  1. 压缩的好处和坏处
    • 压缩的优先:以减少磁盘IO,减少磁盘存储空间
    • 压缩的缺点:增加CPU开销
  2. 压缩原则
    • 运算密集型的Job,少用压缩
    • IO密集型的Job,多用压缩

MR支持的压缩编码

  1. 压缩算法对比介绍
压缩格式Hadoop自带算法文件扩展名是否可切片换成压缩格式后,原来的程序是否需要修改
DEFLATEDEFLATE.deflate和文本处理一样,不需要修改
GzipDEFLATE.gz和文本处理一样,不需要修改
bzip2bzip2.bz2和文本处理一样,不需要修改
LZOLZO.lzo需要建索引,还需要指定输入格式
SnappySnappy.snappy和文本处理一样,不需要修改
  1. 压缩性能的比较
压缩算法原始文件大小压缩文件大小压缩速度解压速度
gzip8.3GB1.8GB17.5MB/s58MB/s
bzip28.3GB1.1GB2.4MB/s9.5MB/s
LZO8.3GB2.9GB49.3MB/s74.6MB/s

压缩方式选择

  • 压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片

Gzip压缩

  • 优点:压缩率比较高
  • 缺点:不支持 SPlit:压缩/解压缩速度一般

Bzip2压缩

  • 压缩率高,支持 SPlit
  • 压缩/解压缩速度慢

Lzo压缩

  • 优点:压缩/解压速度比较快,支持Split
  • 缺点:压缩率一般,想支持切片需要额外创建索引

Snappy压缩

  • 优点:压缩和解压缩速度快
  • 缺点:不支持 Split,压缩率一般

压缩位置选择

  • 压缩可以在MapReduce作用的任意阶段启用
  • 输入端采用压缩
    • Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压
    • 数据量小于快打小,重点考虑压缩和解压缩速度比较快的LZO/Snappy
    • 数据量非常大,重点考虑支持切片的 Bzip2 和 LZO
  • Mapper端输出采用压缩
    • 为了减少 MapTask 和 ReducerTask 之间的网络IO。重点考虑压缩和解压缩快的 LZO、Snappy
  • Reducer输出采用压缩
    • 如果数据永久保存,考虑压缩率比较高的 Bzip2 和 Gzip。如果作为下一个 MapReduce 输入,需要考虑数据量和是否支持切片

压缩参数配置

  1. Hadoop引入了编码/解码器
压缩格式对应的编码/解码器
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOcom.hadoop.compression.lzo.LzopCodec
Snappyorg.apache.hadoop.io.compress.SnappyCodec
  1. 要在Hadoop中启用压缩,可以配置如下参数
参数默认值阶段建议
io.compression.codecs (在core-site.xml中配置)无,这个需要在命令行输入hadoop checknative查看输入压缩Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置falsemapper输出mapper输出
mapreduce.map.output.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress.DefaultCodecmapper输出企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)falsereducer输出这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)org.apache.hadoop.io.compress.DefaultCodecorg.apache.hadoop.io.compress.DefaultCodec使用标准工具或者编解码器,如gzip和bzip2

压缩实操案例