微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java hadoop五 hadoop mapreduce 排序和序列化案例

什么是序列化与反序列化:

序列化(Serialization)是指把结构化对象转化为字节流

反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

mapreduce中的序列化:

实现这个接口即可

一个方法是进行序列化

第二个方法是反序列化

mapreduce中的排序:

WritableComparable 接口包含了排序以及序列化,如果我们序列化的同时还需要排序,就实现这个接口即可。

 

需求:

代码实现:

目录结构:

包装类:通过继承实现排序与序列化

package sortserializedemo;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SortBean implements WritableComparable<SortBean> {
    private String word;
    private int num;

    public SortBean() {
    }

    public SortBean(String word, int num) {
        this.word = word;
        this.num = num;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    //排序
    public int compareto(SortBean o) {
        //按字典顺序排
        int res = this.word.compareto(o.word);
        //如果大小相同  比较num
        if(res==0){
            return this.num - o.num;
        }
        return res;
    }

    @Override
    public String toString() {
        return "SortBean{" +
                "word='" + word + '\'' +
                ", num=" + num +
                '}';
    }

    //实现序列化
    public void write(DataOutput out) throws IOException {

        out.writeUTF(word);
        out.writeInt(num);
    }

    //实现反序列化
    public void readFields(DataInput in) throws IOException {
        this.word=in.readUTF();
        this.num=in.readInt();
    }
}

Mapper:

package sortserializedemo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {

    public void map(LongWritable longWritable, Text text, OutputCollector<SortBean, NullWritable> outputCollector, Reporter reporter) throws IOException {
        //分隔文本内容 取出K2
        String[] split = text.toString().split("\t");
        //包装
        SortBean sortBean = new SortBean(split[0], Integer.parseInt(split[1]));
        //K2 sortBean V2 为空 后边基于K2进行操作
        outputCollector.collect(sortBean,NullWritable.get());
    }
}

Reducer:

package sortserializedemo;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {

    @Override
    protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

Main:

package sortserializedemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.textoutputFormat;
import partitiondemo.MyMapper;
import partitiondemo.MyPartitioner;
import partitiondemo.MyReducer;
import partitiondemo.ReducerMain;

public class ReduceMain {

    public static void main(String[] args) throws Exception{
        //创建job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "my-partition");
        //指定job所在jar包
        job.setJarByClass(ReducerMain.class);
        //指定文件读取方式 路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.40.150:9000/test.txt"));
        //指定mapper
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        //指定reducer
        job.setReducerClass(MyReducer.class);
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(NullWritable.class);

        //指定输出方式类以及输出路径 目录必须不存在
        job.setoutputFormatClass(textoutputFormat.class);
        textoutputFormat.setoutputPath(job, new Path("hdfs://192.168.40.150:9000/lgy_test/res"));

        //将job提交到yarn集群
        boolean bl = job.waitForCompletion(true);
        System.exit(bl?0:1);
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐