微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Hadoop学习笔记: MapReduce(2)

一. 切片与MapTask并行度决定机制

现有如下的问题: 1G的数据, 启动8个MapTask, 可以提高集群的并发处理能力. 那么1K的数据, 如果也启动8个MapTask, 会提高集群性能吗? MapTask并行任务是否是越多越好呢? 哪些因素影响了MapTask并行度?

MapTask并行度决定机制

首先需要区分两个概念:@H_404_9@ 1. 数据块: 数据块(Blocks)是HDFS物理上把数据分成不同的块. 数据块是HDFS的存储数据单位@H_404_9@ 2. 数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分进行存储. 数据切片是MapRudece程序计算输入数据的单位, 一个切片会对应启动一个MapTask.@H_404_9@ 需要注意的是, 认情况下, 切片大小=数据块大小. 这是由HDFS的"数据本地化优化"的特性决定的, 也即在存储输入数据的节点上运行map任务, 无需集群带宽资源, 便可获得最佳性能. 如果分片跨越2个数据块,对于任何一个HDFS节点(基本不可能同时存储这2个数据块), 分片中的另外一块数据就需要通过网络传输到map任务节点, 与使用本地数据运行map任务相比, 效率则更低. 同时, 对数据进行切片时不会考虑从数据整体, 而是会逐个针对每一个文件进行单独的切片.

二. Job提交流程源码和切片源码详解

Job提交流程源码

FileInputFormat切片源码解析(input.getSplits(job))

(1) 先找到数据存储的目录@H_404_9@ (2) 遍历处理目录下的每一个文件, 每个文件中:@H_404_9@         a) 获取文件大小fs.sizeOf(ss.txt)@H_404_9@         b) 计算切片大小@H_404_9@         > minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));@H_404_9@         getFormatMinSplitSize()方法返回一个long类型的1@H_404_9@         getMinSplitSiez(JobContext job)方法返回SPLIT_MINSIZE常量. 这个常量参数由mapred-default.xml参数配置文件决定, 认为0.@H_404_9@        因此, minSize返回一个1L@H_404_9@         > maxSize = getMaxSplitSize(job)@H_404_9@         getMaxSplitSize(JobContext context) 方法返回SPLIT_MAXSIZE常量, 这个常量参数同样由mapred-default.xml参数配置文件决定. 但认情况下没有配置此常量的参数, 因此在方法返回参数时, 由getLong()方法提供认值: Long.MAX_VALUE.@H_404_9@         c) 判断这个文件是否可以进行切割. 若可进行切割, 则获取块大小@H_404_9@         long blockSize = file.getBlockSize();@H_404_9@         本地模式下运行时, 块大小认为32M@H_404_9@         d) 计算切片大小@H_404_9@         long splitSize = this.computeSplitSize(blockSIze, minSize, maxSize);@H_404_9@         在computeSplitSize()方法中, 返回值如下:

return Math.max(minSize, Math.min(maxSize, blockSize));

        在认情况下, 切片大小 = 块大小@H_404_9@         e) 进行切片, 每次完成切片后, 都需要判断剩下部分的大小是否大于块的1.1倍, 如果小于1.1倍, 则就将剩下的部分划为一块切片

while(((double) bytesRemaining)/splitSize > SPLIT_SLOP)

        f) 将切片信息写入一个切片规划文件中, 暂时存储在.staging文件

(3) 提交切片规划文件到YARN上, YARN上的MrAppMaster根据切片规划文件计算需要开启MapTask的个数@H_404_9@ **************************************************************************************************************@H_404_9@ 如何调整切片的大小?@H_404_9@ 如果需要调大切片, 则需要将minSize调大; 如果需要更小的切片, 则需要将maxSize调小@H_404_9@ **************************************************************************************************************@H_404_9@ 获取切片信息API

// 获取切片信息API
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

二. TextInputFormat

在运行MapReduce程序时, 输入的文件格式多种多样, 包括: 基于行的日志文件, 二进制格式文件, 数据库表等等, 那么针对不同的数据类型, MapReduce也同样提供了很多不同种类的数据格式. FileInputFormat常见的接口实现类包括TextInputFormat, keyvalueTextInputFormat, NLineInputFormat, CombineTextInputFormat, 以及自定义InputFormat等.

TextInputFormat

TextInputFormat是FileInputFormat的认实现类, 按行读取每条记录, 其键值对是<LongWritable, Text>@H_404_9@ 示例:

Rich learning form@H_404_9@Intelligent learning engine@H_404_9@Learning more convenient@H_404_9@From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

(0,Rich learning form)@H_404_9@(20,Intelligent learning engine)@H_404_9@(49,Learning more convenient)@H_404_9@(74,From the real demand for more close to the enterprise)

CombineTextInputFormat

 框架认的TextInputFormat切片机制是对每个文件进行单独的切片规划, 不管文件多小, 都会产生一个单独的切片, 且都会交给一个MapTask. 如果一个任务中包含多个小文件, 那么就会产生大量的MapTask, 导致处理效率低下. 因此, 在这种情况下, 需要使用CombineTextInputFormat切片机制进行处理.@H_404_9@ 1) 虚拟存储切片最大值设置@H_404_9@ CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M@H_404_9@ 注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置@H_404_9@ 2) 切片机制

三. MapReduce工作流程

> 关于环形缓冲区内的元数据(Meta data), 其中包含以下几类数据:@H_404_9@ index: 记录数据开始的位置@H_404_9@ partition: 记录数据存放于哪个分区@H_404_9@ keystart: 记录数据的key值开始的索引@H_404_9@ valuestart: 记录数据的value值开始的索引

> 环形缓冲区认大小是100M, 如果数据占到磁盘空间总大小的80%时, 则会开始反向写入. 这么做的原因是: 可以在向文件溢写的过程中, 同时开启向环形缓冲区写入的线程. 若新数据写入的速度较快, 则会等待旧数据溢写完成后继续进行写入.  如果直到缓冲区写满才进行溢写, 则需要等待溢写完成之后才可以重新对缓冲区进行写入.@H_404_9@ 缓冲区的大小可以通过参数调整: mapreduce.task.is.sort.mb 认100M

 四. Shuffle机制(混洗)

Map方法之后, Reduce方法之前的数据处理过程称之为Shuffle.

Shuffle机制

 具体Shuffle过程如下:@H_404_9@ 1) MapTask收集map()方法输出的kv对, 放到内存缓冲区中.@H_404_9@ 2) 从内存缓冲区不断溢写到本地磁盘文件, 可能会溢出多个文件@H_404_9@ 3) 多个溢出文件会被合并成大的溢出文件@H_404_9@ 4) 在溢出过程即合并的过程中, 都需要调用Partitioner进行分区和针对key的排序(按字典进行快速排序)@H_404_9@ 5) ReduceTask根据自己的分区号, 从各个MapTask机器上拉取相应的结果分区数据@H_404_9@ 6) ReduceTask会抓取到来自不同MapTask同一分区的结果文件, ReduceTask会将这些文件再进行合并(Merge Sort)@H_404_9@ 7) 合并成大文件后, Shuffle的过程也就结束了, 后面就要进入ReduceTask的逻辑运算过程

Partition分区

 认的Partitioner分区代码如下:

public class HashPartitioner<K, V> extends Partitioner<K, V>{

    public int getPartition(K key, V value, int numReduceTasks){
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }

}

认分区是根据keyhashCodeReudceTasks个数取模得到的. 用户无法指定key存储在哪个分区.

自定义Partitioner

1)自定义Partitioner方法需要重写getPartition()方法

public class CustomPartitioner exxtends Partitioner<TExt, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions)
        // 控制分区代码逻辑
        ... ...
        return partition;
    }
}

2) 在Job驱动中, 设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

3) 自定义Partition后, 要根据自定义Partition的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

分区总结

1) 如果ReduceTask的数量 > getPartition的结果数, 则会多产生几个空的输出文件part-r-000xx;@H_404_9@ 2) 如果1 < ReduceTask的数量 < getPartition的结果数, 则有一部分分区数据无法存储, 会产生Exception;@H_404_9@ 3) 如果ReduceTask的数量 = 1, 则不管MapTask端输出多少个分区文件, 最终结果都会交给这一个ReduceTask, 最终也就只会产生一个结果文件part-r-00000. 因为在源代码中, 当用户把ReduceTask的个数设置为1时, 会进入程序本身已经定义好的getPartitioner匿名内部类;

if (this.partitions > 1) {
                this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
            } else {
                this.partitioner = new Partitioner<K, V>() {
                    public int getPartition(K key, V value, int numPartitions) {
                        return NewOutputCollector.this.partitions - 1;
                    }
                };
            }

 4) 分区号必须从零开始, 逐一累加

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐