对于Hadoop集群来说,如果我们只用它的HDFS来存放数据那就大材小用了。Hadoop另个一更重要的功能是可以进行并行计算,而Hadoop默认的分布式计算框架就是Map-Reduce框架。

一个例子

为了便于理解,我们看一个例子:你是一个班主任,你有一袋由红豆,绿豆,黄豆混在一起的豆子。然后你需要计算每种豆子有多少个。幸运的是你有40个听话的小朋友来帮你。你可能这样来安排你的豆子计数工作。把混在一起的豆子分成40份,每个小朋友一份,把豆子分开成三份。大家都分完后再找3个小朋友来分别统计一种豆子的个数。大家都自己分好的豆子交给这三个小朋友进行最后统一计数。这就是一个Map-Reduce工作模式,把任务分成40份,大家一起独立工作,这就是Map,最后把Map的中间工作结果汇总到一起,这就是Reduce。
为了随后的演示,我随机生成了一亿个豆子,他们可能是红,黄,绿中的任一种。我再HDFS里用给一个文件来记录。每条记录表示一个豆子。一共是540MB,bolck大小是128M,所以这个文件一共有5个block。我的HDFS集群也刚好有5个DataNode。数据记录看起来是这样的:


red
yellow
green
red
yellow
red
green
yellow
yellow
yellow
green

MapReduce里的重要概念

不可变性

在集群里并行工作一个重要的目标就是提高并行度,大家可以同时工作。这就要求彼此的依赖和共享尽可能的小。这也是MapReduce框架的一个特征。一个MapReduce的job可以有多个Mapper和Reducer,多个Mapper之间,多个Reducer之间是没有依赖和共享的。整个工作框架只有一个地方需要彼此依赖和交换数据就是Mapper和Reducer之间。首先Reducer需要等待Mapper工作完成,还有就是框架需要知道把Mapper的结果送到哪个Reducer。

数据格式

在MapReduce框架里一个值不能单独存在,它必须以key-value的形式来存在。你如果熟悉函数式编程语言里的Map和Reduce你会发现函数式编程语言里的Map和Reduce并不需要key。那么为什么Hadoop的MapReduce里输入的数据,Map的中间结果,以及Reduce的结果都是以key-value的形式存在呢?这是因为MapReduce考虑的是大数据并行处理的场景。key的作用是用来进行分布的依据。比如上边我们说的数豆子的例子,在最后汇总的时候豆子的颜色就是key,这样就可以3个小朋友对三种不同颜色的豆子同时进行计数。有些时候(比如上边例子中40个小朋友同时分豆子的时候)并没有key的概念,但是为了统一,MapReduce所有的输入输出都是一个key-value

InputFormat

既然MapReduce需要的数据输入格式是key-value形式的。那么我们就需要把文件能转化为key-value的工具。InputFormat就是干这个事的。InputFormat有一下几个作用:
1. 根据你的设置读取文件
2. 可以把文件分成不同的InputSplits,这样就可以分布计算。在HDFS里默认一个block就是一个InputSplit
3. 需要提供一个RecordReader来将InputSplit转化为一组key-value

默认的InputFormat是TextInputFormat,它可以把文件的一行生成一个key-value,key你可以认为是行号,value是行的内容。

Mapper

Mapper的作用就是对单条数据进行计算,它接收的输入是一个key-value,输出可以是0个或者多个key-value。对于我们上边数豆子的例子,我们输入是:


1,yellow
2,green
3,red
4,yellow
5,red
6,green
7,yellow
8,yellow
9,yellow
10,green

我们在map函数里就是对每个豆子计数1,以颜色作为key,value是1.我们希望输出是:


yellow,1
green,1
red,1
yellow,1
red,1
green,1
yellow,1
yellow,1
yellow,1
green,1

我们看一下Mapper的实现代码:


    public static class BeansMapper extends Mapper<Object,Text,Text,IntWritable>{
        IntWritable one = new IntWritable(1);
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            context.write(value, one );
        }
    }

BeansMapper 这个类继承自模板类Mapper,类型参数分别对应map函数的输入key,输入value,输出key,输出value。
在MapReduce里key和value都需要能够序列化,实现Writable接口,因为它们需要传输和写入文件。MapRecude有自己的序列化实现,所以我们不能使用Java的int,String类型,而是要使用MapReduce自己定义的Text,IntWritable类型。另外key需要排序,所以需要实现Comparable接口。
我们通过Context.write把我们map计算的中间结果写入本地磁盘,注意不是HDFS。因为这只是计算的临时结果,运算完成会被删除。并不需要存入HDFS,从而引发备份等操作。

Reducer

Reducer的作用是对同一个key的一组value进行汇总,输出是经过汇总的零个或者多个key-value。
我们看一下Reducer的实现代码:


    public static class BeansReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        IntWritable res = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
            int sum = 0;            
            for(IntWritable value:values) {
                sum+=value.get();
            }
            res.set(sum);
            context.write(key,res);
        }
    }

Reducer的输入是一组value,它们有着同样的key。在我们这个数豆子的例子里,key就是某种具体的颜色,value应该都是1,我们所需要做的就是对1进行累加,从而得到一种颜色的豆子总数。

启动程序 Driver

有了Mapper和Reducer,我们还需要一个程序来提交这个MapReduce的job,并对job进行配置。我们来看一下代码:


    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Beans Counter");
        job.setJarByClass(BeansCounter.class);
        job.setMapperClass(BeansMapper.class);
        job.setReducerClass(BeansReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/test/beans"));
        FileOutputFormat.setOutputPath(job, new Path("/test/beansResult"));
        job.waitForCompletion(true);
    }

首先我们生成一个Job,然后需要给job设置Jar,因为这个jar需要被部署到Hadoop集群里执行这个job的节点上。接着需要设置Mapper和Reducer的class。
然后设置Reducer输出的key和value的类型。这里默认Mapper输出的key和value的类型和Reducer是一样的,如果不一样,你可以通过setMapOutputKeyClass() 和setMapOutputValueClass()来设置。最后我们设置MapReduce默认的InputFormat的读取文件路径和输出结果路径。最后job.waitForCompletion,是提交job并等待job执行结束。
完整的代码:


package fun.rethink;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BeansCounter {
    public static class BeansMapper extends Mapper<Object,Text,Text,IntWritable>{
        IntWritable one = new IntWritable(1);
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            context.write(value, one );
        }
    }
    public static class BeansReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        IntWritable res = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
            int sum = 0;            
            for(IntWritable value:values) {
                sum+=value.get();
            }
            res.set(sum);
            context.write(key,res);
        }
    }
   
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        //conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
        Job job = Job.getInstance(conf,"Beans Counter");
        job.setJarByClass(BeansCounter.class);
        job.setMapperClass(BeansMapper.class);
        job.setReducerClass(BeansReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/test/beans"));
        FileOutputFormat.setOutputPath(job, new Path("/test/beansResult"));
        job.waitForCompletion(true);
    }
}

我们可以把这个class打包成一个hadoopDemo.jar,然后在我们hadoop集群里通过如下命令运行:


 hadoop jar hadoopDemo.jar fun.rethink.BeansCounter

等到job执行完成后,然后我们就可以通过


hdfs dfs -get /test/beansResult ~/beansResult

命令把结果拷贝到本地来查看。我们发现执行的结果是一个文件夹,这是为了并行计算,每个reducer写一个文件。如果多个reducer向同一个文件写肯定不利于并行计算。里面每一个part表示是一个reducer的执行结果。需要注意的是如果你输出的文件存在,job执行时会报错,并不会覆盖原来文件。这也是为了保护HDFS里的文件。
最终结果如下:


green   33333941
red     33329132
yellow  33336927

通过web查看执行结果

我们可以通过访问http://hadoop1:8088/cluster/apps 来查看一个job的执行情况。

如果你想查看这个job的详细情况,你需要启动你的jobhistory后台进程:


/opt/hadoop/sbin/mapred --daemon start historyserver

可以看到,对一亿条记录进行分类计数,用了1分27秒:

发表评论

电子邮件地址不会被公开。 必填项已用*标注

%d 博主赞过: