书接上回,还是我们数豆子的程序,这次我们做一些改进。

combiner

我们之前的例子里每个mapper只是对一个豆子计数1,然后对每种豆子求总数的任务都是放到reducer里去完成。那我们能不能在mapper里进行一个局部汇总,然后reducer里再做进一步汇总?答案是肯定的。那就是通过设置combiner。combiner的作用就是在mapper进行完计算后,在每个mapper里进行一次局部的reduce。所以combiner class一般和reducer class一样。设置方法是:


job.setCombinerClass(BeansReducer.class);

通过添加combiner,减少了mapper和reducer之间的数据传递,而且mapper的并行执行节点多于reducer,可以更充分利用集群的计算能力。改动后,我们的程序时间是原来的三分之一。只用了34秒。

Mapper和Reducer的个数

默认的情况下mapper的个数和你的数据块的个数一样。因为数据块在HDFS里就是分布式存储的。mapper个数和数据块个数一样,可以在集群里没有数据搬移的情况下尽量分布执行。
默认情况下reducer的个数是1,这都导致我们刚才看到的结果只有一个part-r-0000。在我们的例子里,可以如果可以通过3个reducer来并行计算,每个reducer来统计一个不同颜色的豆子,那不是更好。我们可以通过下边的代码设置reducer的个数:


job.setNumReduceTasks(3);

然后我们再次执行程序,从结果里看,我们确实得到了三个文件,分别是part-r-0000,part-r-0001,part-r-0002。但是我们发现part-r-0000的文件是空的。part-r-0001的文件里只有一个统计值

red     33329132

part-r-0002文件里有两个统计值


green   33333941
yellow  33336927

Partitioner

上边我们虽然设置了ReduceTasks是3,但是发现有一个文件是空。这是因为如果我们设置了reducer的个数,那么MapReduce就会通过partitioner来把不同的key分到不同的reducer。默认的partitioner是HashPartitioner,它通过key的哈希值经过计算然后除以我们设置的reducer个数,最后取余数,然后根据余数决定把这个key,以及对应的value list发往哪个reducer的。


public class HashPartitioner<K, V> extends Partitioner<K, V> {
 
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

默认的HashPartitioner有个问题就是如果我的一个key经过计算后是2,另一个是5,它们除以task数3的余数都是2,虽然key不同,但是最终都会分到reducer2去。那我们有办法自定义吗?当然可以。


    public static class BeansPartitioner extends Partitioner<Text, IntWritable> {

        @Override
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {

            if (key.toString().equals("red")) {
                return 0;
            } else if (key.toString().equals("green")) {
                return 1;
            }
            return 2;
        }

    }

注意上边的例子是只针对numReduceTasks为3的情况,并不能应付numReduceTasks为其他数值的情况。但是作为一个例子是足够了。通过我们的修改,我们可以看到我们的job最终output有3个文件,每个文件内都是针对一个key的计数。

现在你知道一个MapReduce操作有Map,Combine,Partition,Reduce,那么还有其他步骤吗?它们的次序是什么呢?接下来我们就看一个完整的MapReduce的流程图:

shuffle

shuffle指的是在node之间进行数据交换,这里指的是Map Node计算出来的中间结果key-value,如何传输到不同的reduce Node。Shuffle的同时会对key进行排序。更进一步,你也可以指定对同一个key的一组value进行排序的策略。
需要注意的是在shuffling之前的部分都是在Map Node上执行的,之后是在Reduce Node上执行的。在集群里Shuffle需要Map的数据写入临时文件,而且还需要排序,网络传输等,是特别消耗资源的。在设计map reduce job 的时候要尽量减少Shuffle的数据量。

现在很多企业的大数据计算平台都适用了Spark,它除了提供Map,Reduce算子外,还有额外其他更多的算子,使得我们对数据的操作更加灵活。另外它还有很多机器学习和图运算的功能。所以我们在这里不再过多解释MapReduce计算框架。我在讲完Hadoop系列后,会接着写一个Spark系列。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

%d 博主赞过: