MapReduce Cross 示例
package com.bsr.cross; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 第一次mr--目的是获取某一人是哪些人的好友 * * */ public class Cross { //输入:A:B,C,D,F,E,O //输出:B->A C->A D->A F->A E->A O->A public static class Map extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] value1=value.toString().split(":"); String[] value2=value1[1].split(","); for (String string : value2) { context.write(new Text(string), new Text(value1[0])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ // 输入<B->A><B->E><B->F>.... // 输出 B A,E,F,J @Override protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException, InterruptedException { StringBuffer sb=new StringBuffer(); for (Text text : value) { sb.append(text+","); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws Exception { //读取classpath下的所有xxx-site.xml配置文件,并进行解析 Configuration conf=new Configuration(); FileSystem fs = FileSystem.get(configuration); String s = "/wc/output3"; Path path = new Path(s); fs.delete(path, true); Job job=Job.getInstance(conf); //通过主类的类加载器机制获取到本job的所有代码所在的jar包 job.setJarByClass(Cross.class); //指定本job使用的mapper类 job.setMapperClass(Map.class); //指定本job使用的reducer类 job.setReducerClass(Reduce.class); //指定mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定reducer输出的kv数据类型 job.setoutputKeyClass(Text.class); job.setoutputValueClass(Text.class); //指定本job要处理的文件所在的路径 FileInputFormat.setInputPaths(job, new Path("/wc/data/")); FileOutputFormat.setoutputPath(job, new Path("/wc/output3")); //将本job向hadoop集群提交执行 boolean flag=job.waitForCompletion(true); System.exit(flag?0:1); } }
进行了逻辑的转换;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。