package day02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author * @create 2019-09-17 16:33 **/ //LongWritable (map端的输入) 记录偏移量的 // Text 读取的数据类型 要处理的数据 //(注意这两个类型永远不变) // Text (map端的输出) // IntWritable public class WordCount1 { //map端 public static class MapTask extends Mapper<LongWritable,Text,Text,IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //key 维护偏移量 value 就是我们的数据 //context 用来把处理好的数据写出去 String[] words = value.toString().split(","); //把数据写出去 (hadoop,1) for (String word : words) { context.write(new Text(word),new IntWritable(1)); } } } //reduce 端 (hadoop,1) (hadoop,35) public static class ReduceTask extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //key 指的是 单词 hadoop //value (1,1,1,1,1,1,1) int count = 0; for (IntWritable value : values) { count++; } //写出去 context.write(new Text(key),new IntWritable(count)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { System.setProperty("HADOOP_USER_NAME","root"); //告诉jvm 要运行哪些类 输入 Configuration conf = new Configuration(); //设置参数 连接hadoop集群 conf.set("fs.defaultFS","hdfs://hadoop01:9000"); Job job = Job.getInstance(conf); job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(WordCount1.class); //job告诉输出参数的类型 输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(IntWritable.class); //job 告诉输入跟输出路径 FileInputFormat.addInputPath(job,new Path("/beida/wc.txt")); FileOutputFormat.setoutputPath(job,new Path("/test1")); //友情提示一下这里b?是一个三元表达式 boolean b = job.waitForCompletion(true); System.out.println(b?"牛逼成功了":"有问题"); } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。