微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

大数据开发Hadoop-----MapReduce

1.常用得数据序列化类型

在这里插入图片描述

2.MapReduce编程规范
1)Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义
(3)Mapper中的业务逻辑写在map()方法
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2)Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3)Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象

3.需求:统计单词出现的次数
(1)引入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

(2)自定义Mapper类

package zjc.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
 *统计文件中单词出现的次数
 * */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

    Text k=new Text();
    IntWritable v=new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] words = line.split(" ");
        for(String word:words){
            k.set(word);
            context.write(k,v);
        }
    }
}

(3)自定义Reducer类

package zjc.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        sum=0;
        for (IntWritable count:values) {
            sum += count.get();
        }
        v.set(sum);
        context.write(key,v);
    }
}

(4)自定义Driver类

package zjc.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void  main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息以及获取 job 对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
        job.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));
// 7 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4.自定义Bean对象实现序列化接口
(1)必须实现 Writable 接口
(2)构造函数
(3)重写序列化write与反序列化readFields的方法
(4)toString方法
5.需求:统计一个手机号耗费的总上行流量、总下行流量、总流量

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐