微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Hadoop——WordCount实例及源码分析

二话不说,先上代码与执行。

package com.yarn.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Mywc {

       public static void main(String[] args) throws IOException,  ClassNotFoundException, InterruptedException {

             Configuration conf = new Configuration();

             // Create a new Job

             // 读取配置项

             Job job = Job.getInstance(conf);

             job.setJarByClass(Mywc.class);

             // Specify varIoUs job-specific parameters

             job.setJobName("myjob");

//           job.setInputPath(new Path("in"));

//           job.setoutputPath(new Path("out"));

             Path inPath = new Path("/user/root/test.txt");

             FileInputFormat.addInputPath(job, inPath);

             Path outPath = new Path("/output2/wordcount");

             // 如果输出路径存在则先删除

             if (outPath.getFileSystem(conf).exists(outPath))

                    outPath.getFileSystem(conf).delete(outPath, true);

             FileOutputFormat.setoutputPath(job, outPath);

             job.setMapperClass(MyMapper.class);

             job.setMapOutputKeyClass(Text.class);

             job.setoutputValueClass(IntWritable.class);

             job.setReducerClass(MyReducer.class);

             // Submit the job, then poll for progress until the job is  complete

             job.waitForCompletion(true);

       }

}

实现MyMapper

package com.yarn.wc;

 

 

import java.io.IOException;

import java.util.StringTokenizer;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

 

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

 

 

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoretokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

        }

    }

}

实现MyReducer

package com.yarn.wc;

 

 

import java.io.IOException;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

 

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

 

 

    public void reduce(Text key, Iterable<IntWritable> values, Context context)

            throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

            sum += val.get();

        }

        result.set(sum);

        context.write(key, result);

    }

 

 

}

打包成jar包,放到虚拟机上执行

hadoop jar wc.jar com.yarn.wc.Mywc

观察输出结果文件

hdfs dfs -tail /output2/wordcount/part-r-00000

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐