分布式计算MapReduce编程Ⅰ
实验目的:
1、理解集群分布式计算原理
2、熟悉MR程序中Mapper、Reducer函数的编写
实现倒排索引效果,统计每个单词在不同文件中的出现次数
实验要求
- 有三个文件a.txt,b.txt,c.txt 每个文件的内容为若干行单词,单词之间以空格分开
- 编写程序实现单词的倒排索引效果
- 以 A-M 字母开头(包含小写)的单词出现在 0 区;以 N-Z 字母开头的单词出现在 1 区;其余开头的单词出现在 2 区
- 输出形式:hadoop a.txt->2,b.txt->1
其中 hadoop 是单词(也作为输出的 key),“a.txt->2,b.txt->1” 表示输出的 value,即表示 hadoop 单词在 a.txt 文件中出现次数为 2,在 b.txt文件中出现次数为1
实验思路
要想实现实验效果,首先我们需要理清此次mapreduce的整体逻辑思路:
- 很显然,一次MR是无法完成我们任务的,所以这次实验需要两次MR
- 第一次MR我们用正常的wordcount思路解决,即对每个文件进行处理,得到每个单词在该文件的出现次数,只是在mapper的最后,context.write输出的k2值并不是word,而是 filename->word
- reducer正常输出 k2 和 v3(出现次数)
- 第二次MR,我们利用第一次MR输出的文件作为input,将一整行
filename->word( \t )count 作为v1输入到mapper中 - 在map中进行split以"->“和”\t"为分割点,切割v1,重新组成k2 = word,v2 = filename->count,输出给reducer
- 因为我们最终的输出结果的格式为word a.txt->count,b.txt->count
而v2s就是filename->count的集合,所以输出v2s即可
先将三个文件上传至分布式储存hdfs的 input/DP 目录下
- Mapper函数的编写
public class FirstMapper extends Mapper<Object, Text, Text, IntWritable> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line = v1.toString().trim();//提取内容
String[] words = line.split(" ");//提取word
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String filename = path.getName();//获取单词所在文件名
for(String word : words) {
context.write(new Text(filename+"->"+word), new IntWritable(1));
}
}
}
- Reducer函数的编写
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context)
throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : v2s) {
count += val.get(); //累加v2s得出该单词的出现总次数
}
context.write(k2, new IntWritable(count));
}
}
- Mapper函数的编写
public class SencondMapper extends Mapper<Object, Text, Text, Text> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line=v1.toString().trim();
String[] data=line.split("->");
String filename = data[0];
String[] wordcount = data[1].split("\t");
String word = wordcount[0];
String count = wordcount[1];
context.write(new Text(word), new Text(filename + "->" + count));
}
}
- Reducer函数的编写
public class SencondReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String str = new String();
for(Text val : v2s) {
str = str + val.toString() + ",";
}
str = str.substring(0, str.length()-1);
context.write(new Text(k2), new Text(str));
}
}
- Partitioner函数编写
public class SencondPartitioner extends Partitioner<Text, Text> {
private static int PatitionNumber=0;
public int getPartition(Text k2, Text v2, int numPartitions) {
String word=k2.toString().trim();
if (word.length()==0) return 0;
char firstchar=Character.toupperCase(word.charat(0));
if (firstchar>='N'&&firstchar<='Z' || firstchar>='n'&&firstchar<='z')
PatitionNumber=1;
else if (firstchar>='A'&&firstchar<='M' || firstchar>='a'&&firstchar<='m')
PatitionNumber=0;
else PatitionNumber=2;
return PatitionNumber;
}
}
完整代码
第一次MR
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstMapper extends Mapper<Object, Text, Text, IntWritable> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line = v1.toString().trim();
String[] words = line.split(" ");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String filename = path.getName();
for(String word : words) {
context.write(new Text(filename+"->"+word), new IntWritable(1));
}
}
}
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context)
throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : v2s) {
count += val.get();
}
context.write(k2, new IntWritable(count));
}
}
public class FirstMain {
public static void main(String[] args) throws Exception, IOException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "FirstDP");
if (args.length!=2){
System.err.println("Usage: WordCount45 <in> <out>");
System.exit(2);
}
job.setJarByClass(FirstMain.class);
job.setMapperClass(FirstMapper.class);
job.setReducerClass(FirstReducer.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setoutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
第二次MR
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
public class SencondMapper extends Mapper<Object, Text, Text, Text> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line=v1.toString().trim();
String[] data=line.split("->");
String filename = data[0];
String[] wordcount = data[1].split("\t");
String word = wordcount[0];
String count = wordcount[1];
context.write(new Text(word), new Text(filename + "->" + count));
}
}
public class SencondReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String str = new String();
for(Text val : v2s) {
str = str + val.toString() + ",";
}
str = str.substring(0, str.length()-1);
context.write(new Text(k2), new Text(str));
}
}
public class SencondPartitioner extends Partitioner<Text, Text> {
private static int PatitionNumber=0;
public int getPartition(Text k2, Text v2, int numPartitions) {
String word=k2.toString().trim();
if (word.length()==0) return 0;
char firstchar=Character.toupperCase(word.charat(0));
if (firstchar>='N'&&firstchar<='Z' || firstchar>='n'&&firstchar<='z')
PatitionNumber=1;
else if (firstchar>='A'&&firstchar<='M' || firstchar>='a'&&firstchar<='m')
PatitionNumber=0;
else PatitionNumber=2;
return PatitionNumber;
}
}
public class SecendMain {
public static void main(String[] args) throws Exception, IOException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "SecondDP");
if (args.length!=2){
System.err.println("Usage: WordCount45 <in> <out>");
System.exit(2);
}
job.setJarByClass(SecendMain.class);
job.setMapperClass(SencondMapper.class);
job.setReducerClass(SencondReducer.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(Text.class);
job.setPartitionerClass(SencondPartitioner.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setoutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。