10.30.2008

用 Hadoop 进行分布式并行编程, 第 2 部分

网址:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html

用 Hadoop 进行分布式并行编程(二)程序实例与分析2008年06月27日 星期五 下午 02:29Hadoop 是一个实现了 MapReduce 计算模型的开源分布式并行编程框架,借助于 Hadoop, 程序员可以轻松地编写分布式并行程序,将其运行于计算机集群上,完成海量数据的计算。在本文中,详细介绍了如何针对一个具体的并行计算任务,基于 Hadoop 编写程序,如何使用 IBM MapReduce Tools 在 Eclipse 环境中编译并运行 Hadoop 程序。
前言

在上一篇文章:“用 Hadoop 进行分布式并行编程 第一部分 基本概念与安装部署”中,介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且详细介绍了如何安装 Hadoop,如何运行基于 Hadoop 的并行程序。在本文中,将针对一个具体的计算任务,介绍如何基于 Hadoop 编写并行程序,如何使用 IBM 开发的 Hadoop Eclipse plugin 在 Eclipse 环境中编译并运行程序。


回页首




分析 WordCount 程序

我们先来看看 Hadoop 自带的示例程序 WordCount,这个程序用于统计一批文本文件中单词出现的频率,完整的代码可在下载的 Hadoop 安装包中得到(在 src/examples 目录中)。

1.实现Map类

见代码清单1。这个类实现 Mapper 接口中的 map 方法,输入参数中的 value 是文本文件中的一行,利用 StringTokenizer 将这个字符串拆成单词,然后将输出结果 <单词,1> 写入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 负责收集 Mapper 和 Reducer 的输出数据,实现 map 函数和 reduce 函数时,只需要简单地将其输出的 对往 OutputCollector 中一丢即可,剩余的事框架自会帮你处理好。

代码中 LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为 long, int, String 的替代品。Reporter 则可用于报告整个应用的运行进度,本例中未使用。


代码清单1
public static class MapClass extends MapReduceBase
implements Mapper{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}



2.实现 Reduce 类

见代码清单 2。这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map 任务输出的中间结果,values 是一个 Iterator, 遍历这个 Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key 是一个单词,value 是词频。只需要将所有的 value 相加,就可以得到这个单词的总的出现次数。


代码清单 2
public static class Reduce extends MapReduceBase
implements Reducer {

public void reduce(Text key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}



3.运行 Job

在 Hadoop 中一次计算任务称之为一个 job, 可以通过一个 JobConf 对象设置如何运行这个 job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable, 指定使用代码清单1中实现的 MapClass 作为 Mapper 类, 使用代码清单2中实现的 Reduce 作为 Reducer 类和 Combiner 类, 任务的输入路径和输出路径由命令行参数指定,这样 job 运行时会处理输入路径下的所有文件,并将计算结果写到输出路径下。

然后将 JobConf 对象作为参数,调用 JobClient 的 runJob, 开始执行这个计算任务。至于 main 方法中使用的 ToolRunner 是一个运行 MapReduce 任务的辅助工具类,依样画葫芦用之即可。


代码清单 3
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));

JobClient.runJob(conf);
return 0;
}

public static void main(String[] args) throws Exception {
if(args.length != 2){
System.err.println("Usage: WordCount ");
System.exit(-1);
}
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}



以上就是 WordCount 程序的全部细节,简单到让人吃惊,您都不敢相信就这么几行代码就可以分布式运行于大规模集群上,并行处理海量数据集。

4. 通过 JobConf 定制计算任务

通过上文所述的 JobConf 对象,程序员可以设定各种参数,定制如何完成一个计算任务。这些参数很多情况下就是一个 java 接口,通过注入这些接口的特定实现,可以定义一个计算任务( job )的全部细节。了解这些参数及其缺省设置,您才能在编写自己的并行计算程序时做到轻车熟路,游刃有余,明白哪些类是需要自己实现的,哪些类用 Hadoop 的缺省实现即可。表一是对 JobConf 对象中可以设置的一些重要参数的总结和说明,表中第一列中的参数在 JobConf 中均会有相应的 get/set 方法,对程序员来说,只有在表中第三列中的缺省值无法满足您的需求时,才需要调用这些 set 方法,设定合适的参数值,实现自己的计算目的。针对表格中第一列中的接口,除了第三列的缺省实现之外,Hadoop 通常还会有一些其它的实现,我在表格第四列中列出了部分,您可以查阅 Hadoop 的 API 文档或源代码获得更详细的信息,在很多的情况下,您都不用实现自己的 Mapper 和 Reducer, 直接使用 Hadoop 自带的一些实现即可。


表一 JobConf 常用可定制参数
参数 作用 缺省值 其它实现
InputFormat 将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现, 将一个 InputSplit 解析成 对提供给 map 函数。 TextInputFormat
(针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 对,key 是行在文件中的位置,value 是文件中的一行) SequenceFileInputFormat
OutputFormat 提供一个 RecordWriter 的实现,负责输出最终结果 TextOutputFormat
(用 LineRecordWriter 将最终结果写成纯文件文件,每个 对一行,key 和 value 之间用 tab 分隔) SequenceFileOutputFormat
OutputKeyClass 输出的最终结果中 key 的类型 LongWritable
OutputValueClass 输出的最终结果中 value 的类型 Text
MapperClass Mapper 类,实现 map 函数,完成输入的 到中间结果的映射 IdentityMapper
(将输入的 原封不动的输出为中间结果) LongSumReducer,
LogRegexMapper,
InverseMapper
CombinerClass 实现 combine 函数,将中间结果中的重复 key 做合并 null
(不对中间结果中的重复 key 做合并)
ReducerClass Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果 IdentityReducer
(将中间结果直接输出为最终结果) AccumulatingReducer, LongSumReducer
InputPath 设定 job 的输入目录, job 运行时会处理输入目录下的所有文件 null
OutputPath 设定 job 的输出目录,job 的最终结果会写入输出目录下 null
MapOutputKeyClass 设定 map 函数输出的中间结果中 key 的类型 如果用户没有设定的话,使用 OutputKeyClass
MapOutputValueClass 设定 map 函数输出的中间结果中 value 的类型 如果用户没有设定的话,使用 OutputValuesClass
OutputKeyComparator 对结果中的 key 进行排序时的使用的比较器 WritableComparable
PartitionerClass 对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。 HashPartitioner
(使用 Hash 函数做 partition) KeyFieldBasedPartitioner PipesPartitioner




回页首




改进的 WordCount 程序

现在你对 Hadoop 并行程序的细节已经有了比较深入的了解,我们来把 WordCount 程序改进一下,目标: (1)原 WordCount 程序仅按空格切分单词,导致各类标点符号与单词混杂在一起,改进后的程序应该能够正确的切出单词,并且单词不要区分大小写。(2)在最终结果中,按单词出现频率的降序进行排序。

1.修改 Mapper 类,实现目标(1)

实现很简单,见代码清单4中的注释。


代码清单 4
public static class MapClass extends MapReduceBase
implements Mapper {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private String pattern="[^\\w]"; //正则表达式,代表不是0-9, a-z, A-Z的所有其它字符

public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
String line = value.toString().toLowerCase(); //全部转为小写字母
line = line.replaceAll(pattern, " "); //将非0-9, a-z, A-Z的字符替换为空格
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}



2.实现目标(2)

用一个并行计算任务显然是无法同时完成单词词频统计和排序的,这时我们可以利用 Hadoop 的任务管道能力,用上一个任务(词频统计)的输出做为下一个任务(排序)的输入,顺序执行两个并行计算任务。主要工作是修改代码清单3中的 run 函数,在其中定义一个排序任务并运行之。

在 Hadoop 中要实现排序是很简单的,因为在 MapReduce 的过程中,会把中间结果根据 key 排序并按 key 切成 R 份交给 R 个 Reduce 函数,而 Reduce 函数在处理中间结果之前也会有一个按 key 进行排序的过程,故 MapReduce 输出的最终结果实际上已经按 key 排好序。词频统计任务输出的 key 是单词,value 是词频,为了实现按词频排序,我们指定使用 InverseMapper 类作为排序任务的 Mapper 类( sortJob.setMapperClass(InverseMapper.class );),这个类的 map 函数简单地将输入的 key 和 value 互换后作为中间结果输出,在本例中即是将词频作为 key,单词作为 value 输出, 这样自然就能得到按词频排好序的最终结果。我们无需指定 Reduce 类,Hadoop 会使用缺省的 IdentityReducer 类,将中间结果原样输出。

还有一个问题需要解决: 排序任务中的 Key 的类型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class)), Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。因此我们实现了一个 IntWritableDecreasingComparator 类, 并指定使用这个自定义的 Comparator 类对输出结果中的 key (词频)进行排序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)

详见代码清单 5 及其中的注释。


代码清单 5
public int run(String[] args) throws Exception {
Path tempDir = new Path("wordcount-temp-" + Integer.toString(
new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录

JobConf conf = new JobConf(getConf(), WordCount.class);
try {
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputPath(new Path(args[0]));
conf.setOutputPath(tempDir); //先将词频统计任务的输出结果写到临时目
//录中, 下一个排序任务以临时目录为输入目录。

conf.setOutputFormat(SequenceFileOutputFormat.class);

JobClient.runJob(conf);

JobConf sortJob = new JobConf(getConf(), WordCount.class);
sortJob.setJobName("sort");

sortJob.setInputPath(tempDir);
sortJob.setInputFormat(SequenceFileInputFormat.class);

sortJob.setMapperClass(InverseMapper.class);

sortJob.setNumReduceTasks(1); //将 Reducer 的个数限定为1, 最终输出的结果
           //文件就是一个。
sortJob.setOutputPath(new Path(args[1]));
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);

sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);

JobClient.runJob(sortJob);
} finally {
FileSystem.get(conf).delete(tempDir); //删除临时目录
}
return 0;
}

private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}






回页首




在 Eclipse 环境下进行开发和调试

在 Eclipse 环境下可以方便地进行 Hadoop 并行程序的开发和调试。推荐使用 IBM MapReduce Tools for Eclipse, 使用这个 Eclipse plugin 可以简化开发和部署 Hadoop 并行程序的过程。基于这个 plugin, 可以在 Eclipse 中创建一个 Hadoop MapReduce 应用程序,并且提供了一些基于 MapReduce 框架的类开发的向导,可以打包成 JAR 文件,部署一个 Hadoop MapReduce 应用程序到一个 Hadoop 服务器(本地和远程均可),可以通过一个专门的视图 ( perspective ) 查看 Hadoop 服务器、Hadoop 分布式文件系统( DFS )和当前运行的任务的状态。

可在 IBM alphaWorks 网站下载这个 MapReduce Tool, 或在本文的下载清单中下载。将下载后的压缩包解压到你 Eclipse 安装目录,重新启动 Eclipse 即可使用了。

设置 Hadoop 主目录

点击 Eclipse 主菜单上 Windows->Preferences, 然后在左侧选择 Hadoop Home Directory,设定你的 Hadoop 主目录,如图一所示:


图 1


创立一个 MapReduce Project

点击 Eclipse 主菜单上 File->New->Project, 在弹出的对话框中选择 MapReduce Project, 输入 project name 如 wordcount, 然后点击 Finish 即可。,如图 2 所示:


图 2


此后,你就可以象一个普通的 Eclipse Java project 那样,添加入 Java 类,比如你可以定义一个 WordCount 类,然后将本文代码清单1,2,3中的代码写到此类中,添加入必要的 import 语句 ( Eclipse 快捷键 ctrl+shift+o 可以帮你),即可形成一个完整的 wordcount 程序。

在我们这个简单的 wordcount 程序中,我们把全部的内容都放在一个 WordCount 类中。实际上 IBM MapReduce tools 还提供了几个实用的向导 ( wizard ) 工具,帮你创建单独的 Mapper 类,Reducer 类,MapReduce Driver 类(就是代码清单3中那部分内容),在编写比较复杂的 MapReduce 程序时,将这些类独立出来是非常有必要的,也有利于在不同的计算任务中重用你编写的各种 Mapper 类和 Reducer 类。

在 Eclipse 中运行

如图三所示,设定程序的运行参数:输入目录和输出目录之后,你就可以在 Eclipse 中运行 wordcount 程序了,当然,你也可以设定断点,调试程序。


图 3

主题:Skynet --- ruby的类Google Map/Reduce框架

Skynet是一个很响亮的名字,因为它是阿诺施瓦辛格主演的经典系列电影《终结者》里面的统治人类的超级计算机网络。不过本文的Skynet没这么恐怖,它是一个ruby版本的Google Map/Reduce框架的名字而已。

Google的Map/Reduce框架实在太有名气了,他可以把一个任务切分为很多份,交给n台计算机并行执行,返回的结果再并行的归并,最后得到运算的结果。据说Google一个搜索结果会Map到7000台服务器并行执行,这么多么可怕的分布式运算能力阿!有了Map/Reduce,程序员就可以在无需关注分布式框架的情况下,用简单的代码写出来健壮、并行的分布式应用程序,并且可以充分发挥计算机群集运算的能力。

如今能够实现Map/Reduce算法的框架已经有好几个了,其中最有名气的可能就是Yahoo发起的开源项目Hadoop,不过Hadoop并不是用ruby编写的,但在ruby的世界,Adam Pisoni已经开发出来了ruby版本的Map/Reduce框架,这就是Skynet。

Adam Pisoni开发Skynet的初衷是因为Adam Pisoni的公司Geni.com是一家定位于家族SNS的互联网网站。网站提供的新闻推送功能要求能够从大量的用户产生的信息当中提取特定用户感兴趣的内容,推送给用户。这实际上是一个分布式运算模型,要能够把任务分布到多台服务器上面执行,最后把任务归并回来。Adam Pisoni没有找到合适的框架,最终自己开发了Skynet,运用Map/Reduce算法来提供这个分布式运算平台。

用Skynet开发Map/Reduce的分布式应用程序非常简单,让我们举一个简单的例子看看吧:假设有一个1GB的文本文件,我们的任务是要统计该文件当中每个单词出现的次数统计。传统的做法当然很简单,顺序读入文件内容,进行单词统计就行了,但是毫无疑问,执行速度会很慢。如果我们有一个1000台服务器的运算群集,我们可以如何利用Skeynet来并发执行这个程序,从而缩短统计时间呢?

Map/Reduce算法的过程是:

1、Partition(划分数据)
把数据划分为1000份,这个过程由Skynet自动完成

2、Map
除了划分数据,还需要把运算该数据的代码也Map到每个运算节点上面去并发执行。这1000个节点各自执行自己的任务,执行完毕以后把执行结果返回

3、Partition
这1000分执行结果需要归并,于是我们再次划分数据,比方说划分为10份,这个过程也是Skynet自动完成的

4、Reduce
把Reduce代码和Reduce数据分发到10个节点执行,每个节点执行完毕返回数据。如果需要再次Reduce可以再次执行。最终Reduce为一个总共的结果。

其实Map/Reduce算法的原理是很简单的,好了,看看Skynet下面,我们怎么实现呢?其实我们需要编写的代码只有两个方法:一个map方法,告诉skynet如何执行每份数据,一个reduce方法,告诉skynet如何归并每份数据,所以这个并行算法最终用Skynet来写的话,也非常简单:

Ruby代码
class MapreduceTest
include SkynetDebugger

def self.map(datas)
results = {}
datas.each do |data|
results[data] ||= 0
results[data] += 1
end
[results]
end

def self.reduce(datas)
results = {}
datas.each do |hashes|
hashes.each do |key,value|
results[key] ||= 0
results[key] += value
end
end
results
end
end

class MapreduceTest
include SkynetDebugger

def self.map(datas)
results = {}
datas.each do |data|
results[data] ||= 0
results[data] += 1
end
[results]
end

def self.reduce(datas)
results = {}
datas.each do |hashes|
hashes.each do |key,value|
results[key] ||= 0
results[key] += value
end
end
results
end
end


这个就是一个最简单、但是完整ruby版本的Map/Reduce代码了。我们需要编写一个map方法,告诉skynet去统计每个单词的出现次数,我们还需要编写一个reduce方法告诉skynet去归并每个map的统计结果。好了,剩下所有的工作都归Skeynet接管了,是不是很简单!

当然要让这个Map/Reduce跑起来我们还需要做一些工作,比方说安装skynet,配置skynet的并行节点等等,这些琐碎的工作可以看看skynet自己的文档:http://skynet.rubyforge.org/doc/index.html,就不详述了。

值得一提的是Skynet可以和Rails框架良好的整合起来工作,你可以把Rails当中一些非常耗时、可以Map/Reduce的工作丢给Skynet去异步后台执行,比方说:

Ruby代码
MyModel.distributed_find(:all, :conditions => “created_on < ’#{3.days.ago}’”).each(:some_method)

MyModel.distributed_find(:all, :conditions => “created_on < ’#{3.days.ago}’”).each(:some_method)

把最近3天以来所有的model查询处理以后要执行的耗时操作some_method交给Skynet,让Skynet动用他强大的运算网络去执行。

还可以异步执行:
Ruby代码
model_object.send_later(:method, options, :save)

model_object.send_later(:method, options, :save)
把耗时的任务交给Skynet去异步执行。

对于拥有强大运算网络、并且需要进行大量耗时运算的web2.0网站来说,Skynet真是一个很棒的工具,他可以让程序员很简单的编写处理健壮而高效的分布式应用程序!

主题:基于Hadoop的Map reduce编程(一)

翻译的一篇国外的关于hadoop mapreduce的文章,文章比较长,先翻译第一部分吧

翻译者:pconlin900
博客:http://pconline900.javaeye.com

Hadoop是apache的一个开源的map-reduce框架,MapReduce是一个并行计算模型,用来处理海量数据。模型

思想来源于google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()两个主要的功能。

这是一个很简单的类似于Hadoop的MapReduce应用例子,应用了mapreduce的基本思想,可以帮助理解hadoop

的处理思想和技术,但注意,它没有使用hadoop框架。
例子的功能是创建一些字符串,然后统计这些字符串里面每个字符出现的次数,最后汇总得到总的字符出现

次数。

Listing 1. 主程序
public class Main
{

public static void main(String[] args)
{

MyMapReduce my = new MyMapReduce();
my.init();

}
}


Listing 2. MyMapReduce.java

import java.util.*;

public class MyMapReduce
{
List buckets = new ArrayList();
List intermediateresults = new ArrayList();
List values = new ArrayList();

public void init()
{
for(int i = 1; i<=30; i++)
{
values.add("http://pconline900.javaeye.com" + new Integer(i).toString());
}



System.out.println("**STEP 1 START**-> Running Conversion into Buckets**");
System.out.println();
List b = step1ConvertIntoBuckets(values,5);
System.out.println("************STEP 1 COMPLETE*************");
System.out.println();
System.out.println();

System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all

Buckets");
System.out.println();
List res = step2RunMapFunctionForAllBuckets(b);
System.out.println("************STEP 2 COMPLETE*************");

System.out.println();
System.out.println();
System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate

Results and Printing Results");
System.out.println();
step3RunReduceFunctionForAllBuckets(res);
System.out.println("************STEP 3 COMPLETE*************");
System.out.println("************pconline900 翻译*************");
System.out.println("***********博客:

http://pconline900.javaeye.com*************");


}
public List step1ConvertIntoBuckets(List list,int numberofbuckets)
{
int n = list.size();
int m = n / numberofbuckets;
int rem = n% numberofbuckets;

int count = 0;
System.out.println("BUCKETS");
for(int j =1; j<= numberofbuckets; j++)
{
List temp = new ArrayList();
for(int i=1; i<= m; i++)
{

temp.add((String)values.get(count));
count++;


}
buckets.add(temp);
temp = new ArrayList();
}
if(rem != 0)
{
List temp = new ArrayList();
for(int i =1; i<=rem;i++)
{

temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
}
System.out.println();
System.out.println(buckets);
System.out.println();
return buckets;

}

public List step2RunMapFunctionForAllBuckets(List list)
{
for(int i=0; i< list.size(); i++)
{
List elementList = (ArrayList)list.get(i);
new StartThread(elementList).start();
}

try
{
Thread.currentThread().sleep(1000);
}catch(Exception e)
{
}
return intermediateresults;
}

public void step3RunReduceFunctionForAllBuckets(List list)
{
int sum =0;
for(int i=0; i< list.size(); i++)
{
//you can do some processing here, like finding max of all results etc
int t = Integer.parseInt((String)list.get(i));
sum += t;
}


System.out.println();
System.out.println("Total Count is "+ sum);
System.out.println();

}

class StartThread extends Thread
{
private List tempList = new ArrayList();
public StartThread(List list)
{
tempList = list;
}
public void run()
{

for(int i=0; i< tempList.size();i++)
{
String str = (String)tempList.get(i);

synchronized(this)
{
intermediateresults.add(new Integer(str.length()).toString());
}


}
}

}

}


 init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理。

 step1ConvertIntoBuckets()方法将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6

个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点;

 step2RunMapFunctionForAllBuckets()方法创建了5个线程(每个bucket一个),每个线程StartThread处

理每个bucket并把处理结果放在intermediateresults这个arraylist中。

 如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的

处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算。\

 step3RunReduceFunctionForAllBuckets()方法加载intermediateresults中间处理结果,并进行汇总处

理,最后得到最终的计算结果。