MapReduce过程详解
数据运行的底层目前还是以hadoop为主,我们主要接触的还是上层抽象出来的比较方便的数据工具,例如hive,spark,impala等等,那么hadoop底层的核心原理又是什么呢?
hadoop的底层核心由HDFS,MapReduce和YARN组成,HDFS是大数据的存储引擎,分布式文件系统,YARN是资源调度系统,而MapReduce就是它的计算框架,但同时,MapReduce也是一个编程模型,因为MapReduce抽象出来的框架,开发人员只需要按照其规则编写代码,就可以直接放到hadoop上去运行,也就是说开发人员可以不用关心hadoop的实现原理,调用原理,只需要关注业务逻辑即可,大大降低了大数据开发的门槛。
一、MapReduce是如何完成数据计算的呢?其原理如下:
MapReduce模型只包含两个过程,其一是Map,其二是Reduce,
Map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值,然后Shuffle将相同 Key 合并,形成 <Key, Value 集合 >,然后再将这个 <Key, Value 集合 > 输入Reduce,经过计算输出零个或多个 <Key, Value> 对。
大数据领域的关系代数运算,矩阵运算,都可通过MapReduce进行。
我们来拆解一下MapReduce的过程,以WordCount为例:
WordCount解决的是把一个文本经过处理,统计出文本中每个词出现的频率问题。
那么我们正常Python代码实现如下:
string='''hello abc
hello hadoop
hello 123
123'''
String_list=string.replace("\n"," ").lower().split(" ")
count_dict = {}
for string in String_list:
if string in count_dict.keys():
count_dict[string] = count_dict[string]+1
else:
count_dict[string]=1
print(count_dict)
输出结果为:{'hello': 3, 'abc': 1, 'hadoop': 1, '123': 2}
编程框架里,Map过程是一个Map函数完成的,Reduce过程是一个Reduce函数完成。Map函数和Reduce函数的输入都是一个Key,Value对,输出也都是一个Key,Value对。
WordCount的MapReduce实现如下:
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoretokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}
详解过程如下:
- 一般情况下,Map过程输入的Value就是我们的输入的一行数据,而Key在计算中是用不到的,所以实现的时候可以随便传值,而wordCount的实现是用偏移量给它传值的。即行首在文档中的索引位置。
- 传入keyvalue对以后,Map程序进行处理,处理过程是把每一行数据拆成单个的词,并且每个词作为输出的Key,而输出的Value,都是1,即此时让每个词出现的频率都是1。输出很多组Key,Value对。上述例子中的字符串经过Map以后,就会变成如下样式:{"hello":1} {"abc":1} {"hello":1} {"hadoop":1} {"hello":1} {"123":1} {"123":1}
- 然后经过一个叫做Shuffle的过程,Shuffle本质是进行数据分区,放到WordCount里面表现出来的就是相似的值合并,Shuffle是Reduce进程处理的一部分,所以可能有些地方不会把这个过程单独拉出来,而业务代码中也不会有体现。
分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是 shuffle。
Map 输出的 <Key, Value>shuffle 到哪个 Reduce 进程是这里的关键,它是由 Partitioner 来实现的,MapReduce 框架默认的 Partitioner 用 Key 的哈希值对 Reduce 任务数量取模,相同的 Key 一定会落在相同的 Reduce 任务 ID 上。从实现上来看的话,这样的 Partitioner 代码只需要一行。
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
经过Shuffle以后,Map的输出就会变成如下样式:{"hello":[1,1,1]} {"abc":[1]} {"hadoop":[1]} {"123":[1,1]}
4.然后把这些键值对传给reduce,作为reduce的入参,经过reduce处理,reduce就对value值进行sum,然后把输入的key继续作为key,而sum(Value)的值作为输出的Value,就输出了最终结果:{'hello': 3, 'abc': 1, 'hadoop': 1, '123': 2}
处理过程用图表示,如下图:
二、MapReduce是怎么运行的呢?
运行过程如下:
- 应用进程 JobClient 将用户作业 JAR 包存储在 HDFS 中,将来这些 JAR 包会分发给 Hadoop 集群中的服务器执行 MapReduce 计算。
- 应用程序提交 job 作业给 JobTracker。
- JobTracker 根据作业调度策略创建 JobInProcess 树,每个作业都会有一个自己的 JobInProcess 树。
- JobInProcess 根据输入数据分片数目(通常情况就是数据块的数目)和设置的 Reduce 数目创建相应数量的 TaskInProcess。
- TaskTracker 进程和 JobTracker 进程进行定时通信。
- 如果 TaskTracker 有空闲的计算资源(有空闲 cpu 核心),JobTracker 就会给它分配任务。分配任务的时候会根据 TaskTracker 的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据。
- TaskTracker 收到任务后根据任务类型(是 Map 还是 Reduce)和任务参数(作业 JAR 包路径、输入数据文件路径、要处理的数据在文件中的起始位置和偏移量、数据块多个备份的 Datanode 主机名等),启动相应的 Map 或者 Reduce 进程。
- Map 或者 Reduce 进程启动后,检查本地是否有要执行任务的 JAR 包文件,如果没有,就去 HDFS 上下载,然后加载 Map 或者 Reduce 代码开始执行。
- 如果是 Map 进程,从 HDFS 读取数据(通常要读取的数据块正好存储在本机);如果是 Reduce 进程,将结果数据写出到 HDFS。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。