Hadoop集群(三)—— WordCount(单词统计)源码解析

caroly 2020年05月19日 144次浏览

WordCount源码解析

特别数据类型介绍

『Hadoop』本身提供了一套可优化网络序列化传输的基本类型,而不直接使用『Java』内嵌的类型。这些类型都在『org.apache.hadoop.io』包中。

HadoopJavaDescription
IntWritableint整型
LongWritablelong长整型
FloatWritablefloat浮点数
DoubleWritabledouble双字节数
Textstring使用UTF-8格式存储的文本
ByteWritablebyte单字节数
BooleanWritableboolean标准布尔型
NullWritablenull当<key, value>中的key或value为空时使用

两种类型之间的相互转换:

// Java类型转为Hadoop类型
// 调用Hadoop类型的构造方法或者调用set()方法
int aInt= 1;
IntWritable bWritable= new IntWritable(aInt);
bWritable.set(aInt);
// --------------------------------------------
//Hadoop类型转为Java类型
//对于Text,需要调用toString()方法,其它类型调用get()方法。
Text text= new Text("1111");
String string= text.toString();
int bInt= bWritable.get();

流程分析

流程解析模块推荐这篇:逸情公子


map-reduce流程图

1、在客户端启动一个作业。

2、向 JobTracker 请求一个 Job ID。

3、将运行作业所需要的资源文件复制到 HDFS 上,包括 MapReduce 程序打包的 JAR 文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在 JobTracker 专门为该作业创建的文件夹中。文件夹名为该作业的 Job ID。JAR 文件默认会有 10 个副本(mapred.submit.replication 属性控制);输入划分信息告诉了JobTracker 应该为这个作业启动多少个 map 任务等信息。

4、JobTracker 接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个 map 任务,并将 map 任务分配给 TaskTracker 执行。对于 map 和 reduce 任务,TaskTracker 根据主机核的数量和内存的大小有固定数量的 map 槽和 reduce 槽。这里需要强调的是:map 任务不是随随便便地分配给某个 TaskTracker 的,这里有个概念叫:数据本地化(Data-Local)。意思是:将 map 任务分配给含有该 map 处理的数据块的 TaskTracker 上,同时将程序 JAR 包复制到该 TaskTracker 上来运行,这叫“运算移动,数据不移动”。而分配 reduce 任务时并不考虑数据本地化。

5、TaskTracker 每隔一段时间会给 JobTracker 发送一个心跳,告诉 JobTracker 它依然在运行,同时心跳中还携带着很多的信息,比如当前 map 任务完成的进度等信息。当 JobTracker 收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当 JobClient 查询状态时,它将得知任务已完成,便显示一条消息给用户。


Map端

1、每个输入分片会让一个 map 任务来处理,默认情况下,以 HDFS 的一个块的大小(默认为 64M)为一个分片,当然我们也可以设置块的大小。map 输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为 100M,由 io.sort.mb 属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的 80%,由 io.sort.spill.percent 属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

2、在写入磁盘之前,线程首先根据 reduce 任务的数目将数据划分为相同数目的分区,也就是一个 reduce 任务对应一个分区的数据。这样做是为了避免有些 reduce 任务分配到大量数据,而有些 reduce 任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行 hash 的过程。然后对每个分区中的数据进行排序,如果此时设置了 Combiner,将排序后的结果进行 Combia 操作,这样做的目的是让尽可能少的数据写入到磁盘。

3、当 map 任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和 combia 操作,目的有两个:1. 尽量减少每次写入磁盘的数据量;2. 尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将 mapred.compress.map.out 设置为 true 就可以了。

4、将分区中的数据拷贝给相对应的 reduce 任务。有人可能会问:分区中的数据怎么知道它对应的 reduce 是哪个呢?其实 map 任务一直和其父 TaskTracker 保持联系,而 TaskTracker 又一直和 JobTracker 保持心跳。所以 JobTracker 中保存了整个集群中的宏观信息。只要 reduce 任务向 JobTracker 获取对应的 map 输出位置就可以了。


Reduce端

1、Reduce 会接收到不同 map 任务传来的数据,并且每个 map 传来的数据都是有序的。如果 reduce 端接受的数据量相当小,则直接存储在内存中(缓冲区大小由 mapred.job.shuffle.input.buffer.percent 属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由 mapred.job.shuffle.merge.percent 决定),则对数据合并后溢写到磁盘中。

2、随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在 map 端还是 reduce 端,MapReduce 都是反复地执行排序,合并操作。

3、合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 reduce 函数。


源码解析

源码解析模块推荐这篇:Hongten


MyWordCount

Job 继承自 JobContextImpl 并实现 JobContext 接口,而 JobContext 接口又实现 MRJobConfig 接口。该接口中定义有诸多常量值。这些常量就是在配置 Job 作业的过程中有可能设置到常量。比如 setMapperClass、setReducerClass 对应的 COMBINE_CLASS_ATTR、REDUCE_CLASS_ATTR。如果不设置,就会按照默认值来设置。


WordCount 作业的重点在于 waitForCompletion() 方法,提交当前的 Job 作业,然后轮询进度,直到作业完成。

首先是判断当前作业的状态。接着是判断 verbose 的值,来决定是否将运行进度等信息输出给用户。一个 Job 对象有两种状态:DEFINE 和 RUNNING 。Job 对象被创建时的状态为 DEFINE,当且仅当 Job 对象处于 DEFINE 状态,才可以用来设置作业的一些配置;当作业通过 submit() 方法被提交,状态会变为 RUNNING。这时候作业处于调度运行阶段,不能设置参数。

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, 
        ClassNotFoundException {
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}

submit() 方法会先检查是否正确使用了 new API,这通过 setUseNewAPI() 方法检查旧版本的属性是否被设置来实现的,接着调用 connect() 连接 JobTracker 并提交。

随后通过集群的文件系统对象和集群的客户端对象拿到提交器。最后通过提交器的内部方法 submitJobInternal() 向系统提交作业,其参数为当前作业以及集群。作业提交的过程涉及以下几个方面:

  • 检查作业的输入和输出配置。
  • 计算作业的切片数量。
  • 如有必要,为作业的分布式缓存设置必要的账户信息。
  • 将作业的 jar 和配置复制到分布式文件系统上的 map-reduce 系统目录中。
  • 将作业提交给 JobTracker,并可以选择监视其状态。

其中,writeSplits() 方法通过当前作业以及当前作业的提交地址返回当前的片数,相当于 map 数值。此方法先获取配置文件信息,再根据使用的接口来决定切片方法是新的还是旧的,一般是新的切片方法 writeNewSplits()。

@SuppressWarnings("unchecked")
private <T extends InputSplit>
    int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
                                    jobSubmitDir.getFileSystem(conf), array);
    return array.length;
}

writeNewSplits() 方法返回的是一个数组的长度,该数组是由List集合转换而来。这个集合里存放的是切片的对象,由 InputFormat 的 getSplits() 方法获得。

通过反射为 "给定的类" 创建一个 InputFormat 对象,并通过 conf 对其进行初始化。这个 "给定的类" 通过当前作业的 getInputFormatClass() 方法来获取,实际调用的就是 JobContextImpl 中的方法。该方法中设置类为 TextInputFormat.class 。这也意味着 getInputFormatClass() 方法的返回值实际指向的 TextInputFormat 。所以 InputFormat 的 getSplits() 方法需要找 TextInputFormat 。因为 TextInputFormat 继承自 FileInputFormat ,所以找到的 getSplits() 方法在 FileInputFormat 中。

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    // 获取最小和最大拆分大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 获取输入目录
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
        // 获取文件路径
        Path path = file.getPath();
        // 获取文件大小
        long length = file.getLen();
        if (length != 0) {
            // 存放这个文件上传之后所有块的位置信息
            BlockLocation[] blkLocations;
            // 判断文件来源本地还是HDFS系统
            if (file instanceof LocatedFileStatus) {
                // 本地
                blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
                // HDFS
                FileSystem fs = path.getFileSystem(job.getConfiguration());
                blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            // 当前文件是否可以切割
            if (isSplitable(job, path)) {
                // 获取块的大小,上传时候设置的
                long blockSize = file.getBlockSize();
                // 计算片的大小
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);
				// 剩余的字节数
                long bytesRemaining = length;
                // 剩余字节数/片的大小 > 1.1
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                    // 获取块的索引
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    // 制作切片并添加进List集合中
                    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                         blkLocations[blkIndex].getHosts(),
                                         blkLocations[blkIndex].getCachedHosts()));
                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                         blkLocations[blkIndex].getHosts(),
                                         blkLocations[blkIndex].getCachedHosts()));
                }
            } else { // not splitable
                if (LOG.isDebugEnabled()) {
                    // Log only if the file is big enough to be splitted
                    if (length > Math.min(file.getBlockSize(), minSize)) {
                        LOG.debug("File is not splittable so no parallelization "
                                  + "is possible: " + file.getPath());
                    }
                }
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                                     blkLocations[0].getCachedHosts()));
            }
        } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                  + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
}

此时返回的 List 集合将会转为数组,并返回数组的长度,也就是切片数量。


MyMapper
map 输入

map 从 HDFS 获取输入流,然后定位到切片的位置,除了第一个切片,其他切片都是从第二行开始读取数据进行处理。

Mapper 类中重写 map() 方法,如果有新的数据,就进入循环中重写 map() 方法。

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

可以看到整个过程中 context 对象起到了非常关键的作用。它通过 run() 方法传递进来,而 map() 方法的执行通过 MapTask 来调度,其位置在『hadoop-mapreduce-client-core-2.9.2.jar』--> 『org.apache.hadoop.mapred.MapTask.class』中。MapTask 类中 run() 方法完成了对 Mapper 类中的 run() 方法的调用。

@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;
	// 判断这是否是一个MapTask任务
    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        // 判断有没有ReduceTask任务
        if (conf.getNumReduceTasks() == 0) {
            // 如果没有,整个时间分配给map
            mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            // 如果二者都有,时间分配如下
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
    }
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {      // 资源释放
        runJobCleanupTask(umbilical, reporter);
        return;
    }
    if (jobSetup) {        // 资源建立
        runJobSetupTask(umbilical, reporter);
        return;
    }
    if (taskCleanup) {     // 任务资源释放
        runTaskCleanupTask(umbilical, reporter);
        return;
    }
	
    if (useNewApi) {       // 新的Api
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {               // 旧的Api
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
}

此 run() 方法中主要看 runNewMapper() 方法。首先声明诸多对象,为接下来的实例化操作准备。

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
    void runNewMapper(final JobConf job,
                      final TaskSplitIndex splitIndex,
                      final TaskUmbilicalProtocol umbilical,
                      TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
InterruptedException {
    // make a task context so we can get the classes
    // 声明一个任务的上下文容器对象taskContext,实例化对象指向  TaskAttemptContextImpl  。
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                    getTaskID(),
                                                                    reporter);
    // make a mapper
    // 声明一个  mapper  对象,指向反射工具实例化的对象,默认是MAP_CLASS_ATTR。
    // Mapper类是我们自己写的,需要在MyWordCount类中通过setMapperClass()方法设置。  
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
        (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    
    // make the input format
    // 声明一个  inputFormat  格式化对象,获取方式与  mapper  对象一样,
    // 需要在MyWordCount类中通过setInputFormatClass()方法设置,不设置的话将使用默认的类来完成对象的实例化。
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
        (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    
    // rebuild the input split
    // 获取切片对象(之前在  client  端已经准备好),通过切片索引获取切片所对应的节点的位置信息。
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
                            splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    // 声明一个  input  对象来读取数据,实例化对象指向  NewTrackingRecordReader。
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
        new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);

    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    // 声明一个 output  对象来写数据,实例化对象的指向根据有无  ReduceTask  任务来选择。
    org.apache.hadoop.mapreduce.RecordWriter output = null;

    // get an output object
    if (job.getNumReduceTasks() == 0) {
        output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    //声明上下文对象  mapContext  ,实例化对象指向  MapContextImpl 。
    // 在MapContext,NewTrackingRecordReader,LineRecordReader类里面都包含了nextKeyValue(),getCurrentKey(),getCurrentValue()方法。
    // 当我们调用MapContext里面的nextKeyValue()的时候,会去调用NewTrackingRecordReader里面的  nextKeyValue()方法,
    // 这个方法最终会去调用LineRecordReader里面的nextKeyValue()方法。
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
        mapContext = 
        new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
                                                             input, output, 
                                                             committer, 
                                                             reporter, split);
	// 声明一个上下文容器  mapperContext,封装上下文对象  mapContext 。
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
        new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
        mapContext);

    try {
        // 实例化的过程中需要传递两个参数:split  以及  mapperContext 。实际调用的初始化方法为  LineRecordReader  类中的  initialize()  方法。
        input.initialize(split, mapperContext);
        mapper.run(mapperContext);
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
    } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
    }
}
public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    // 强转为文件对象
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    // 切片的起始位置
    start = split.getStart();
    // 切片的结束位置
    end = start + split.getLength();
    // 切片对应的完整文件
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);

    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
        isCompressedInput = true;
        decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
                ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job,
                                               this.recordDelimiterBytes);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn;
        } else {
            if (start != 0) {
                // So we have a split that is only part of a file stored using
                // a Compression codec that cannot be split.
                throw new IOException("Cannot seek in " +
                                      codec.getClass().getSimpleName() + " compressed stream");
            }

            in = new SplitLineReader(codec.createInputStream(fileIn,
                                                             decompressor), job, this.recordDelimiterBytes);
            filePosition = fileIn;
        }
    } else {
        fileIn.seek(start);
        in = new UncompressedSplitLineReader(
            fileIn, job, this.recordDelimiterBytes, split.getLength());
        filePosition = fileIn;
    }
    // 如果起始位置不为0,就丢弃当前块的第一条记录并读取下一个块的第一条记录。防止切割过程中数据不完整。
    // 第一块也会读取第二块的第一条记录。
    if (start != 0) {
        start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
}
public boolean nextKeyValue() throws IOException {
    if (key == null) {
        // key为偏移量,默认为LongWritable
        key = new LongWritable();
    }
    // 给key赋值
    key.set(pos);
    if (value == null) {
        // value默认为Text
        value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    // 总是多读一行
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
        if (pos == 0) {
            newSize = skipUtfByteOrderMark();
        } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
        }

        if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
        }

        // line too long. try again
        LOG.info("Skipped line of size " + newSize + " at pos " + 
                 (pos - newSize));
    }
    if (newSize == 0) {
        key = null;
        value = null;
        return false;
    } else {
        return true;
    }
}

map 输出
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
    void runNewMapper(final JobConf job,
                      final TaskSplitIndex splitIndex,
                      final TaskUmbilicalProtocol umbilical,
                      TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
InterruptedException {
    ... ...
        
    // 声明一个 output  对象来写数据,实例化对象的指向根据有无  ReduceTask  任务来选择。
    org.apache.hadoop.mapreduce.RecordWriter output = null;

    // get an output object
    // 判断reducetask是否有,主要看NewOutputCollector()方法
    if (job.getNumReduceTasks() == 0) {
        output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
    
	... ...
}
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                  ) throws IOException, ClassNotFoundException {
    // 创建一个collecter容器
    collector = createSortingCollector(job, reporter);
    // 分区数量 = Reduce Task的数量
    partitions = jobContext.getNumReduceTasks();
    if (partitions > 1) {
        // 多个分区
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
            ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    } else {
        // 第一个分区器,获取0号分区器
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
            @Override
            public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;
            }
        };
    }
}

创建collecter容器的过程中,最重要的就是缓冲器的处理过程:collector.init(context);

init() 方法实际调用的是 MapOutputBuffer 类中的 init() 方法。该方法中主要做了以下几件事情:

  • 设置内存缓冲区
  • 设置排序器
  • 设置比较器
  • 设置合并器
  • 设置溢写线程

@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
                ) throws IOException, ClassNotFoundException {
    job = context.getJobConf();
    reporter = context.getReporter();
    mapTask = context.getMapTask();
    mapOutputFile = mapTask.getMapOutputFile();
    sortPhase = mapTask.getSortPhase();
    spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
    partitions = job.getNumReduceTasks();
    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

    // map处理数据的时候,需要放入内存缓冲区
    // 这里的DEFAULT_IO_SORT_MB就是系统默认的缓冲区大小,100MB
    // 0.8是内存缓冲区阈值的意思,就是当这个缓冲区使用了80%,缓冲区里面的80%的数据就可以溢写到磁盘。
    final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
    final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
                                  MRJobConfig.DEFAULT_IO_SORT_MB);
    indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                       INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
    if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
                              "\": " + spillper);
    }
    if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
    }
    // 排序器,默认为快速排序算法(QuickSort)
    // 把map里面的乱序的数据,使用快速排序算法进行排序
    // 使得内存中乱序的数据进行排序,然后把排序好的数据,溢写到磁盘
    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
                                                      QuickSort.class, IndexedSorter.class), job);
    // buffers and accounting
    // 上面IO_SORT_MB的单位是MB,左移20位将单位转化为byte
    int maxMemUsage = sortmb << 20;
    // METASIZE是元数据的长度,元数据有4个int单元,分别为
    // VALSTART、KEYSTART、PARTITION、VALLEN,而int为4个byte,
    // 所以METASIZE长度为16。下面是计算buffer中最多有多少byte来存元数据
    maxMemUsage -= maxMemUsage % METASIZE;
    // 元数据数组  以byte为单位
    kvbuffer = new byte[maxMemUsage];
    bufvoid = kvbuffer.length;
    // 将kvbuffer转化为int型的kvmeta  以int为单位,也就是4byte
    kvmeta = ByteBuffer.wrap(kvbuffer)
        .order(ByteOrder.nativeOrder())
        .asIntBuffer();
    // 设置buf和kvmeta的分界线
    setEquator(0);
    bufstart = bufend = bufindex = equator;
    kvstart = kvend = kvindex;
	// kvmeta中存放元数据实体的最大个数
    maxRec = kvmeta.capacity() / NMETA;
    softLimit = (int)(kvbuffer.length * spillper);
    // 此变量较为重要,作为spill的动态衡量标准
    bufferRemaining = softLimit;
    LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
    LOG.info("soft limit at " + softLimit);
    LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
    LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

    // k/v serialization
    // 比较器
    comparator = job.getOutputKeyComparator();
    keyClass = (Class<K>)job.getMapOutputKeyClass();
    valClass = (Class<V>)job.getMapOutputValueClass();
    serializationFactory = new SerializationFactory(job);
    keySerializer = serializationFactory.getSerializer(keyClass);
    // 将bb作为key序列化写入的output
    keySerializer.open(bb);
    valSerializer = serializationFactory.getSerializer(valClass);
    // 将bb作为value序列化写入的output
    valSerializer.open(bb);

    // output counters
    mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
    mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
    fileOutputByteCounter = reporter
        .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

    // compression
    if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
            job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
    } else {
        codec = null;
    }

    // combiner
    final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
    combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                           combineInputCounter,
                                           reporter, null);
    if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
            reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
    } else {
        combineCollector = null;
    }
    spillInProgress = false;
    // 最后一次merge时,在有combiner的情况下,超过此阈值才执行combiner
    minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
    spillThread.setDaemon(true);
    spillThread.setName("SpillThread");
    spillLock.lock();
    try {
        spillThread.start();
        while (!spillThreadRunning) {
            spillDone.await();
        }
    } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
    } finally {
        spillLock.unlock();
    }
    if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
                              sortSpillException);
    }
}

map 小结

在 map 输入阶段:每个 map 处理一个切片的数据量,需要 seek() ,让出第一行,从第二行开始读取数据(切片数量大于 1 )。

在 map 输出阶段:map 输出的是 key, value;但是 map 计算完成以后,会得到 key,value,partition。也就是说,每个数据从 map 输出,就知道归属于哪一个 reduce task 去处理了,归属于哪个分区。

之后,在内存中有一个内存缓冲区 buffer in memory ,这个内存缓冲区是环形缓冲区。
内存大小默认是 100MB,为了是内存溢写不阻塞,默认的阈值是 80%。即只要大于等于 80MB 的时候,就会触发溢写,溢写会把内存中的数据写入到磁盘。在写入磁盘之前要对数据进行快速排序,这是整个框架当中仅有的一次,把数据从乱序到有序。后面的排序都是把有序的数据进行归并排序了。

在排序的时候,有一个判定。有可能我们定义了 combiner,需要压缩一下数据。

现在大数据,最大的瓶颈就是 I/O,磁盘 I/O,网络 I/O,都是慢 I/O。

所以在 I/O 之前,能在内存里面排序就排序,能压缩就尽量压缩。那么在调用 I/O 的时候,写的数据越少越好,速度就越快。

在溢写的时候(partion, sort and spill to disk),先按分区排序,在分区内再按 key 排序。这是因为 map 计算的结果是 key,value,partition。这样的文件才能是内部有序。最后,溢写很多的小文件要归并成一个大文件。那么大文件也是按分区排序,文件里面再按 key 排序。

如果我们做了 combiner,在归并成大文件的时候,框架默认的小文件数量是 3 个。只要我们设置的值大于等于 3,就会触发 combiner 压缩数据,这是为了减少在 shuffer 阶段拉取网络 I/O ,以及在拉完数据以后,让 Reduce 处理数据量变少,加快计算速度。所以 map 的工作的核心目的,就是让 reduce 跑的越来越快。


MyReducer

Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。Reduce Task 和 Map Task 一样,同样也有四种任务类型,分别为 job setup、job cleanup、reduce task、task cleanup 。其位置在『hadoop-mapreduce-client-core-2.9.2.jar』--> 『org.apache.hadoop.mapred.ReduceTask.class』,Reduce 阶段的代码入口是 ReduceTask 类中的 run() 方法。

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    // Reduce阶段的三个小阶段
    if (isMapOrReduce()) {
        copyPhase = getProgress().addPhase("copy");
        sortPhase  = getProgress().addPhase("sort");
        reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewReducer();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    // Reduce Task 的四种类型
    if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
    }
    if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
    }
    if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
    }

    // Initialize the codec
    // 初始化压缩类型
    codec = initCodec();
    // 迭代器
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
	// reduce 阶段依然要用到combiner
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
        (null != combinerClass) ? 
        new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
	// 得到shuffle阶段的插件(此处将shuffle阶段当做一个服务插件)
    Class<? extends ShuffleConsumerPlugin> clazz =
        job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
        new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                                          super.lDirAlloc, reporter, codec, 
                                          combinerClass, combineCollector, 
                                          spilledRecordsCounter, reduceCombineInputCounter,
                                          shuffledMapsCounter,
                                          reduceShuffleBytes, failedShuffleCounter,
                                          mergedMapOutputsCounter,
                                          taskStatus, copyPhase, sortPhase, this,
                                          mapOutputFile, localMapFiles);
    // shuffleConsumerPlugin初始化
    shuffleConsumerPlugin.init(shuffleContext);
	// copy sort merge都在此阶段执行,到这rIter中已经拿到从Map端传过来的所有数据
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();

    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    // 组比较器
    RawComparator comparator = job.getOutputValueGroupingComparator();

    // 判断是否使用新Api
    if (useNewApi) {
        runNewReducer(job, umbilical, reporter, rIter, comparator, 
                      keyClass, valueClass);
    } else {
        runOldReducer(job, umbilical, reporter, rIter, comparator, 
                      keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
}
// 1.用户是否设置分组比较器GROUP_COMPARATOR_CLASS
// 2.用户是否设置排序比较器KEY_COMPARATOR
// 3.如果用户都没有设置,则使用自身key比较器
public RawComparator getOutputValueGroupingComparator() {
    // 通过反射获取分组比较器
    // 用户可以通过配置GROUP_COMPARATOR_CLASS(mapreduce.job.output.group.comparator.class)来定义比较器
    Class<? extends RawComparator> theClass = getClass(
        JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
    if (theClass == null) {
        return getOutputKeyComparator();
    }

    return ReflectionUtils.newInstance(theClass, this);
}

public RawComparator getOutputKeyComparator() {
    // 用户是否设置排序比较器KEY_COMPARATOR
    // 如果用户都没有设置,则使用自身key比较器
    Class<? extends RawComparator> theClass = getClass(
        JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
        return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

// 自身key比较器
public Class<?> getMapOutputKeyClass() {
    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
    if (retv == null) {
        retv = getOutputKeyClass();
    }
    return retv;
}
reduce 小结

Reduce 阶段大致分为 copy、sort 和 reduce 阶段,而 copy 阶段又分为 shuffle 和 merge 阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据。在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge(其中还应注意merge的条件和数据往内存中写入时的情况)。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段。