MapReduce Shuffle 过程详解
一、回顾
-
Shuffle功能
-
分区
-
问题:MapReduce中Reduce的设计本身是为了实现聚合,所以Reduce进程默认只会启动一个
- 单个ReduceTask如果处理的数据量过多,会导致性能较差,或者资源不足导致程序运行失败
-
应用:判断ReduceTask处理的数据量与单台机器资源的关系
- MapTask输出的数据:10GB
- 单台机器的内存:16GB
- 其他程序:10GB
-
实现
-
-
排序
-
分组:基于有序的数据实现对数据的分组,相同K2从所有V2会构建一个迭代器
-
-
自定义数据类型
二、课程目标
-
手机流量分析案例【练习】
- wordcount
- 排序
- 分区
-
Shuffle过程【重要】
- 详细流程如何实现对应功能
-
Shuffle中的优化以及分组【重点】
-
shuffle基本的优化:Combiner、Compress
-
分组的实现:自定义分组
-
三、手机流量分析案例
1、数据
2、需求1及分析
-
统计每个手机号所有上网记录的上行总信息和下行总信息
-
分析
-
step1:结果
手机号 上行总包 下行总包 上行总流量 下行总流量
- 根据结果来判断会用到哪些数据
- 手机号:1
- 上行总包:6
- 下行总包:7
- 上行总流量:8
- 下行总流量:9
-
step2:有没有排序或者分组
- 有分组:手机号
- K2:手机号
-
step3:对比结果,还有哪些字段
- 决定V2:上行总包 下行总包 上行总流量 下行总流量
- 自定义数据类型
-
step4:带入验证
-
Map
-
map
value.tostring.split("\t") phone = split[1] outputKey.set(phone) outputValue.setall(6,7,8,9)
-
K 2:手机号
- Text
-
V2:上行包 下行包 上行流量 下行流量
- JavaBean
-
-
Shuffle:排序和分组
- 排序:按照手机号排序
- 分组:相同手机号的所有上网信息放入了一个迭代器
-
Reduce
- reduce
- 逐个累加即可
- K3:手机号
- V3:上行总包 下行总包 上行总流量 下行总流量
- reduce
-
-
3、需求1实现
-
自定义数据类型
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @ClassName FlowBean1 * @Description Todo * @Date 2020/11/2 10:37 * @Create By Frank */ public class FlowBean1 implements Writable { private long upPack; private long downPack; private long upFlow; private long downFlow; public FlowBean1(){ } public void setAll(long upPack,long downPack,long upFlow,long downFlow){ this.setUpPack(upPack); this.setDownPack(downPack); this.setUpFlow(upFlow); this.setDownFlow(downFlow); } public long getUpPack() { return upPack; } public void setUpPack(long upPack) { this.upPack = upPack; } public long getDownPack() { return downPack; } public void setDownPack(long downPack) { this.downPack = downPack; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } @Override public String toString() { return this.upPack+"\t"+this.downPack+"\t"+this.upFlow+"\t"+this.downFlow; } public void write(DataOutput out) throws IOException { out.writeLong(this.upPack); out.writeLong(this.downPack); out.writeLong(this.upFlow); out.writeLong(this.downFlow); } public void readFields(DataInput in) throws IOException { this.upPack = in.readLong(); this.downPack = in.readLong(); this.upFlow = in.readLong(); this.downFlow = in.readLong(); } }
-
代码实现
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.textoutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName FlowMr1 * @Description Todo 实现手机流量统计的需求一 * @Date 2020/11/2 10:30 * @Create By Frank */ public class FlowMr1 extends Configured implements Tool { public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(),"flow1"); job.setJarByClass(FlowMr1.class); //input Path inputPath = new Path("datas/flow/data_flow.dat"); TextInputFormat.setInputPaths(job,inputPath); job.setMapperClass(FlowMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean1.class); job.setReducerClass(FlowReduce.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(FlowBean1.class); Path outputPath = new Path("datas/output/flow/flow1"); textoutputFormat.setoutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new FlowMr1(), args); System.exit(status); } public static class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean1>{ //输出的Key,手机号 Text outputKey = new Text(); //输出的Value,数据信息 FlowBean1 outputValue = new FlowBean1(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割判断这条数据是否合法:元素的个数至少有10个 String[] split = value.toString().split("\t"); if(split.length > 9){ //给k2赋值 this.outputKey.set(split[1]); //给V2赋值 this.outputValue.setAll(Long.parseLong(split[6]),Long.parseLong(split[7]),Long.parseLong(split[8]),Long.parseLong(split[9])); //输出 context.write(this.outputKey,this.outputValue); }else{ //如果不满足条件,直接丢弃返回 return; } } } public static class FlowReduce extends Reducer<Text,FlowBean1,Text,FlowBean1>{ //输出的value FlowBean1 outputValue = new FlowBean1(); @Override protected void reduce(Text key, Iterable<FlowBean1> values, Context context) throws IOException, InterruptedException { long sumUpPack = 0; long sumDownPack = 0; long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean1 value : values) { sumUpPack += value.getUpPack(); sumDownPack += value.getDownPack(); sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); } //赋值给value this.outputValue.setAll(sumUpPack,sumDownPack,sumUpFlow,sumDownFlow); //输出 context.write(key,this.outputValue); } } }
4、需求2及分析
- 基于需求1的结果:按照上行总流量将结果降序排序
- 分析
- step1:与需求1 的结果只是排序不一样
- step2:有排序,按照上行总流量降序排序
- K2:将五列整体作为K2
- 自定义比较器:按照上行总流量降序排序
- V2:NullWritable
- K2:将五列整体作为K2
5、需求2实现
-
自定义数据类型
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @ClassName FlowBean1 * @Description Todo 封装5列,用于按照上行总流量降序排序 * @Date 2020/11/2 10:37 * @Create By Frank */ public class FlowBean2 implements WritableComparable<FlowBean2> { private String phone; private long upPack; private long downPack; private long upFlow; private long downFlow; public FlowBean2(){ } public void setAll(String phone,long upPack,long downPack,long upFlow,long downFlow){ this.setPhone(phone); this.setUpPack(upPack); this.setDownPack(downPack); this.setUpFlow(upFlow); this.setDownFlow(downFlow); } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUpPack() { return upPack; } public void setUpPack(long upPack) { this.upPack = upPack; } public long getDownPack() { return downPack; } public void setDownPack(long downPack) { this.downPack = downPack; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } @Override public String toString() { return this.phone+"\t"+this.upPack+"\t"+this.downPack+"\t"+this.upFlow+"\t"+this.downFlow; } public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.upPack); out.writeLong(this.downPack); out.writeLong(this.upFlow); out.writeLong(this.downFlow); } public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.upPack = in.readLong(); this.downPack = in.readLong(); this.upFlow = in.readLong(); this.downFlow = in.readLong(); } /** * 在排序时会被调用 * @param o * @return */ public int compareto(FlowBean2 o) { //按照上行总流量降序排序 return -Long.valueOf(this.getUpFlow()).compareto(Long.valueOf(o.getUpFlow())); } }
-
实现代码
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.textoutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName FlowMr1 * @Description Todo 实现手机流量统计的需求二 * @Date 2020/11/2 10:30 * @Create By Frank */ public class FlowMr2 extends Configured implements Tool { public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(),"flow2"); job.setJarByClass(FlowMr2.class); //input Path inputPath = new Path("datas/output/flow/flow1"); TextInputFormat.setInputPaths(job,inputPath); job.setMapperClass(FlowMapper.class); job.setMapOutputKeyClass(FlowBean2.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(FlowReduce.class); job.setoutputKeyClass(FlowBean2.class); job.setoutputValueClass(NullWritable.class); Path outputPath = new Path("datas/output/flow/flow2"); textoutputFormat.setoutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new FlowMr2(), args); System.exit(status); } public static class FlowMapper extends Mapper<LongWritable, Text,FlowBean2, NullWritable>{ //输出的Key FlowBean2 outputKey = new FlowBean2(); //输出的Value NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割 String[] split = value.toString().split("\t"); //赋值 this.outputKey.setAll(split[0],Long.parseLong(split[1]),Long.parseLong(split[2]),Long.parseLong(split[3]),Long.parseLong(split[4])); //输出 context.write(this.outputKey,this.outputValue); } } public static class FlowReduce extends Reducer<FlowBean2, NullWritable,FlowBean2, NullWritable>{ @Override protected void reduce(FlowBean2 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //没有处理逻辑,直接迭代输出 for (NullWritable value : values) { context.write(key,value); } } } }
6、需求3及分析
7、需求3实现
-
自定义分区
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @ClassName FlowPartition * @Description Todo 自定义分区器,按照手机号的开头分区 * @Date 2020/11/2 11:32 * @Create By Frank */ public class FlowPartition extends Partitioner<FlowBean2, NullWritable> { //按照手机开头分区 public int getPartition(FlowBean2 key, NullWritable value, int numPartitions) { //先获取这条数据的手机号 String phone = key.getPhone(); if(phone.startsWith("134")){ return 0; }else if (phone.startsWith("135")){ return 1; }else return 2; } }
-
实现
package bigdata.itcast.cn.hadoop.mapreduce.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.textoutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName FlowMr1 * @Description Todo 实现手机流量统计的需求三 * @Date 2020/11/2 10:30 * @Create By Frank */ public class FlowMr3 extends Configured implements Tool { public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(),"flow3"); job.setJarByClass(FlowMr3.class); //input Path inputPath = new Path("datas/output/flow/flow1"); TextInputFormat.setInputPaths(job,inputPath); job.setMapperClass(FlowMapper.class); job.setMapOutputKeyClass(FlowBean2.class); job.setMapOutputValueClass(NullWritable.class); job.setPartitionerClass(FlowPartition.class); job.setReducerClass(FlowReduce.class); job.setoutputKeyClass(FlowBean2.class); job.setoutputValueClass(NullWritable.class); job.setNumReduceTasks(3); Path outputPath = new Path("datas/output/flow/flow3"); textoutputFormat.setoutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new FlowMr3(), args); System.exit(status); } public static class FlowMapper extends Mapper<LongWritable, Text,FlowBean2, NullWritable>{ //输出的Key FlowBean2 outputKey = new FlowBean2(); //输出的Value NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割 String[] split = value.toString().split("\t"); //赋值 this.outputKey.setAll(split[0],Long.parseLong(split[1]),Long.parseLong(split[2]),Long.parseLong(split[3]),Long.parseLong(split[4])); //输出 context.write(this.outputKey,this.outputValue); } } public static class FlowReduce extends Reducer<FlowBean2, NullWritable,FlowBean2, NullWritable>{ @Override protected void reduce(FlowBean2 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //没有处理逻辑,直接迭代输出 for (NullWritable value : values) { context.write(key,value); } } } }
四、Shuffle过程
1、设计思想
-
分布式的思想
- 将一个大的任务,拆分成多个小的任务,并行执行每个小任务来得到结果
-
问题:分布式中如何能实现全局分组?
- 举个例子: 1 4 7 2 5 8
- |
- 思考:如果用分布式,将这些数字,分成2个部分,两个Task来处理
- task1:1 4 7
- |
- 7 4 1
- task2:2 5 8
- |
- 8 5 2
- task1:1 4 7
- |
- 需求:8 7 5 4 2 1
- 解决:合并
- 选择一种能实现大量数据全局操作的方案来实现一台机器进行聚合
-
Shuffle:主要设计为了做全局分组
2、功能
- 分区:多个reduce的情况下,决定Map输出的每一条数据会被哪个Reduce进行处理
- 排序:为了加快分组
- 分组:为了做全局聚合
3、过程
-
实现进程
- MapTask进程
- ReduceTask进程
-
五大阶段过程
-
Input
-
输入
hadoop hive spark hbase spark hadoop hive spark hbase spark hadoop hive spark hbase spark hive hbase hadoop hadoop
-
-
Split1
0 hadoop hive spark 100 hbase spark 200 hadoop hive spark
-
Split2
0 hbase spark 200 hadoop hive spark 300 hbase spark hive hbase hadoop hadoop
-
-
-
-
MapTask1
K2 V2 part hadoop 1 0 hive 1 1 spark 1 0 hbase 1 1 spark 1 0 hadoop 1 0 hive 1 1 spark 1 0
-
MapTask2
K2 V2 part hbase 1 1 spark 1 0 hadoop 1 0 hive 1 1 spark 1 0 hbase 1 1 spark 1 0 hive 1 1 hbase 1 1 hadoop 1 0 hadoop 1 0
-
-
Shuffle:承上启下,用于连接Map和Reduce阶段
-
Map端的Shuffle
-
Spill:将每个MapTask调用map方法并分区好的数据从内存写入磁盘变成文件【很多个有序的小文件】
- 每个MapTask都会将自己处理分区好的数据放入一个环形缓冲区【内存:100M】
-
当环形缓冲区的存储到达阈值:80%,这80%的存储会被锁定,MapTask继续往另外20%空间写入新的数据
-
这80%的数据会在内存中进行排序
-
- 相同分区的数据放在一起,每个分区内部有序
-
算法:快排
-
以MapTask1为例:
-
-
Merge:将每个MapTask对应的很多小文件合并为一个有序大文件
-
MapTask1:file
hadoop 1 0 hadoop 1 0 spark 1 0 spark 1 0 spark 1 0 hbase 1 1 hive 1 1 hive 1 1
-
MapTask2:file
hadoop 1 0 hadoop 1 0 hadoop 1 0 spark 1 0 spark 1 0 spark 1 0 hbase 1 1 hbase 1 1 hbase 1 1 hive 1 1 hive 1 1
-
-
Reduce端的Shuffle
-
假设:两个reduceTask
-
Merge:每个ReduceTask会到每个MapTask的大文件中拉取属于自己处理的数据,进行合并排序,然后分组
-
拉取:每个ReduceTask会到每个MapTask中拉取属于自己的数据
-
ReduceTask0
-
MapTask1
hadoop 1 0 hadoop 1 0 spark 1 0 spark 1 0 spark 1 0
-
MapTask2
hadoop 1 0 hadoop 1 0 hadoop 1 0 spark 1 0 spark 1 0 spark 1 0
-
-
ReduceTask1
-
MapTask1
hbase 1 1 hive 1 1 hive 1 1
-
MapTask2
hbase 1 1 hbase 1 1 hbase 1 1 hive 1 1 hive 1 1
-
-
-
合并排序:每个ReduceTask将属于自己的多个MapTask的数据进行合并排序
-
ReduceTask0
hadoop 1 0 hadoop 1 0 hadoop 1 0 hadoop 1 0 hadoop 1 0 spark 1 0 spark 1 0 spark 1 0 spark 1 0 spark 1 0 spark 1 0
-
ReduceTask1
hbase 1 1 hbase 1 1 hbase 1 1 hbase 1 1 hive 1 1 hive 1 1 hive 1 1 hive 1 1
-
-
分组
-
ReduceTask0
hadoop <1,1,1,1,1> spark <1,1,1,1,1,1>
-
ReduceTask1
hbase <1,1,1,1> hive <1,1,1,1>
-
-
-
-
Reduce
-
Output
-
-
总结
- Map端Shuffle:MapTask
- Spill
- 将MapTask处理分区好的数据不断放入缓冲区
- 每放一部分【80%】,在内存中对这部分数据进行排序
- 生成有序的每个小部分
- Merge
- 排序:相同分区的数据放在一起,并且每个分区内部有序
- MapTask处理的数据比较大,不能直接在内存中排序
- 如果能将这个MapTask的数据拆分成若干个部分,每个部分都是有序的,使用归并排序实现整个MapTask构建有序
- Spill
- Reduce端Shuffle:ReduceTask
- Map端Shuffle:MapTask
4、流程图
五、Shuffle中的优化
1、程序设计
- Shuffle过程由于会将Map处理的大量的结果数据写入磁盘,Reduce重新读取
- 导致Shuffle过程会比较慢,影响性能
- 核心原则:能不经过shuffle就不要写五大阶段的程序
- map join / 广播 join
2、Combiner
-
Combiner:Map端的聚合
-
现象
-
自己写的Wordcount
-
Map输出
hadoop 1 hive 1 hbase 1 hadoop 1 hive 1 spark 1 hbase 1 hadoop 1
-
不做Combiner:Map的结果直接给了Reduce处理
-
Reduce处理
hadoop <1,1,1> hbase <1,1> hive <1,1> spark <1>
- 3+2+2+1 = 8 次
-
-
官方自带的Wordcount
-
Map输出
hadoop 1 hive 1 hbase 1 hadoop 1 hive 1 spark 1 hbase 1 hadoop 1
-
Combiner:在Map端的shuffle中做分组聚合
hadoop <3> hbase <2> hive <2> spark <1>
-
Reduce处理
hadoop <3> hbase <2> hive <2> spark <1>
- 4 次
-
-
-
发生的阶段
- Map端的shuffle中,每次排序以后会做判断 ,判断是否开启了Combiner
- 如果开启了Combiner,就会调用Combiner的类做分组聚合
-
设计:通过MapTask的个数一般远大于ReduceTask的个数,让每个MapTask对自己处理的数据先做部分聚合,最后由reduce来做所有MapTask的最终聚合,降低了Reduce的负载,提高了Reduce的性能
-
Combiner处理的逻辑:就是Reduce的处理逻辑
- Combiner的类就是Reduce的类
-
实现
//shuffle job.setCombinerClass(WcReducer.class);//开启Combiner
-
应用:不是所有程序都能用combiner
-
程序必须符合分配率
-
统计中位数
-
1 3 5 6 7 9
-
结果:5.5
-
-
3、Compress
-
压缩:通过牺牲cpu的压缩和解压的性能,来提高对磁盘以及网络IO的性能的提升
- 举个栗子
- 不做压缩
- Map:100G
- 写:100s
- Reduce:100G
- 读:100s
- Map:100G
- 做了压缩
- 压缩:10s
- 50GB
- Map:50s
- Reduce:50s
- 解压
- 10s
- 压缩:10s
- 不做压缩
- 举个栗子
-
Hadoop中的压缩
-
hadoop checknative
-
常用
- snappy
- lzo
- lz4
- gzip
- bzip2
-
-
MapReduce程序中使用压缩
-
场景
-
配置
-
Shuffle中的压缩
#开启map的输出的压缩 mapreduce.map.output.compress=true #指定压缩类型 mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
-
MapReduce结果的压缩
mapreduce.output.fileoutputformat.compress=true mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codec
-
-
-
实现
package bigdata.itcast.cn.hadoop.mapreduce.compress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.textoutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName WordCountMr * @Description Todo 自定义开发实现wordcount程序 * @Date 2020/10/30 16:36 * @Create By Frank */ public class SougouMr extends Configured implements Tool { //构建,配置,提交 public int run(String[] args) throws Exception { //todo:1-构建 Job job = Job.getInstance(this.getConf(),"sougou"); job.setJarByClass(SougouMr.class); //todo:2-配置 //input Path inputPath = new Path(args[0]); TextInputFormat.setInputPaths(job,inputPath); //map job.setMapperClass(WcMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //reduce job.setReducerClass(WcReducer.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(IntWritable.class); //output Path outputPath = new Path(args[1]); textoutputFormat.setoutputPath(job,outputPath); //todo:3-提交 return job.waitForCompletion(true) ? 0:-1; } //作为程序入口,负责调用run方法 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //配置压缩 conf.set("mapreduce.map.output.compress","true"); conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.Lz4Codec"); int status = ToolRunner.run(conf, new SougouMr(), args); System.exit(status); } public static class WcMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ //定义输出的Key:单词 Text outputKey = new Text(); //定义输出的Value:恒为1 IntWritable outputValue = new IntWritable(1); /** * 每一条数据会调用一次map方法 * @param key:K1,行的偏移量 * @param value:V1:行的内容 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将每一行的内容进行分割,得到每个单词 String[] words = value.toString().split("\t"); this.outputKey.set(words[2]); context.write(this.outputKey,this.outputValue); } } public static class WcReducer extends Reducer<Text, IntWritable,Text, IntWritable>{ //定义输出的Value IntWritable outputValue = new IntWritable(); /** * 每一组调用一次reduce方法 * @param key:单词 * @param values:同一个单词对应的所有value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } //将单词出现的次数赋值给value this.outputValue.set(sum); //输出 context.write(key,this.outputValue); } } }
六、自定义分组实现
1、分组设计
- MapReduce的shuffle做全局分组
- 通过实现多次排序来加快分组的过程
- 实现:将相同K2对应的所有V2放入同一个迭代器
2、分组规则
3、Top1实现
-
需求:实现统计每个订单中价格最高的那条信息
- 订单id,商品id,价格
Order_0000001 Pdt_01 222.8 Order_0000001 Pdt_05 25.8 Order_0000002 Pdt_03 522.8 Order_0000002 Pdt_04 122.4 Order_0000002 Pdt_05 722.4 Order_0000003 Pdt_01 222.8 Order_0000003 Pdt_02 1000.8 Order_0000003 Pdt_03 999.8
-
结果
Order_0000001 Pdt_01 222.8 Order_0000002 Pdt_05 722.4 Order_0000003 Pdt_02 1000.8 Order_0000001 Pdt_01 222.8 Order_0000002 Pdt_05 722.4 Order_0000003 Pdt_02 1000.8
-
分析
-
实现
-
自定义数据类型
package bigdata.itcast.cn.hadoop.mapreduce.topN; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @ClassName OrderBean * @Description Todo * @Date 2020/11/2 17:13 * @Create By Frank */ public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private String pid; private double price; public OrderBean(){} public String getorderId() { return orderId; } public void setorderId(String orderId) { this.orderId = orderId; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { return this.orderId+"\t"+this.pid+"\t"+this.price; } /** * 订单相同按照价格降序排序 * @param o * @return */ public int compareto(OrderBean o) { //先比较订单 int comp = this.getorderId().compareto(o.getorderId()); if(comp == 0){ //以价格降序排序 return -Double.valueOf(this.getPrice()).compareto(Double.valueOf(o.getPrice())); } return comp; } public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeUTF(this.pid); out.writeDouble(this.price); } public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.pid = in.readUTF(); this.price = in.readDouble(); } }
-
自定义分组
package bigdata.itcast.cn.hadoop.mapreduce.topN; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @ClassName UserGroup * @Description Todo 按照订单id分组 * @Date 2020/11/2 17:19 * @Create By Frank */ public class UserGroup extends WritableComparator { //注册 public UserGroup(){ super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean o1 = (OrderBean) a; OrderBean o2 = (OrderBean) b; //按照订单id比较,作为分组的条件 return o1.getorderId().compareto(o2.getorderId()); } }
-
MR实现
package bigdata.itcast.cn.hadoop.mapreduce.topN; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.textoutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MapReduceDriver * @Description Todo Mapreduce模板,三个类在一个文件中,Map类和Reduce类必须为static修饰 * 规则:继承Configured 实现 Tool * 方法: * main:作为程序运行的入口,调用run方法 * run:构建、配置、提交运行一个Mapreduce的Job * @Date 2020/10/30 15:34 * @Create By Frank */ public class OrderTop extends Configured implements Tool { /** * 构建job * 配置Job * 提交Job * @param args * @return * @throws Exception */ public int run(String[] args) throws Exception { /** * todo:1-构建一个Mapreduce的Job */ Job job = Job.getInstance(this.getConf(),"mode");//加载配置,以及定义job的名称 job.setJarByClass(OrderTop.class);//指定当前类可以通过jar包运行 /** * todo:2-配置Mapreduce的五大阶段 */ //Input // job.setInputFormatClass(TextInputFormat.class);//指定输入类,默认类就是TextInputFormat,如果需要更改,需要写 //添加读取的文件路径 Path inputPath = new Path("datas/orders/orders.txt");//用整个程序的第一个参数来作为输入 TextInputFormat.setInputPaths(job,inputPath);//设置输入要读取的文件路径 //Map job.setMapperClass(MrMapper.class);//指定Mapper类 job.setMapOutputKeyClass(OrderBean.class);//指定Map阶段输出的Key的类型,K2的类型 job.setMapOutputValueClass(NullWritable.class);//指定Map阶段输出的Value的类型,V2的类型 //Shuffle job.setGroupingComparatorClass(UserGroup.class);//自定义分组比较器 // job.setSortComparatorClass(null); // job.setPartitionerClass(null); // job.setCombinerClass(null); //Reduce job.setReducerClass(MrReduce.class);//设置Reduce类 job.setoutputKeyClass(OrderBean.class);//设置Reduce输出的Key的类型,就是K3 job.setoutputValueClass(NullWritable.class);//设置Reduce输出的Value的类型,就是V3 // job.setNumReduceTasks(1);//设置reduceTask的个数,默认为1 //Output // job.setoutputFormatClass(textoutputFormat.class);//指定输出类,默认类就是textoutputFormat Path outputPath = new Path("datas/output/order2");//用整个程序的第二个参数来作为输出 textoutputFormat.setoutputPath(job,outputPath);//设置输出保存结果的路径 /** * todo:3-提交Mapreduce的Job,运行 */ //提交运行,返回boolean值,如果为true,表示运行成功了,返回false,表示运行失败了 return job.waitForCompletion(true) ? 0 : -1; } /** * 作为程序入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象 Configuration conf = new Configuration(); //通过Hadoop的工具类来调用当前类的run方法 int status = ToolRunner.run(conf, new OrderTop(), args); //根据运行的状态退出程序 System.exit(status); } //Mapper类 public static class MrMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable>{ //输出的Key OrderBean outputKey = new OrderBean(); //输出的Value NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); this.outputKey.setorderId(split[0]); this.outputKey.setPid(split[1]); this.outputKey.setPrice(Double.parseDouble(split[2])); context.write(this.outputKey,this.outputValue); } } //Reducer public static class MrReduce extends Reducer<OrderBean, NullWritable,OrderBean, NullWritable>{ @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // context.write(key,NullWritable.get()); for (NullWritable value : values) { context.write(key,NullWritable.get()); } } } }
-
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。