微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

MapReduce整合HBase

目录

概述

    @H_404_3@

    MapReduce是运算框架

    @H_404_3@

    HBase是数据存储系统

    @H_404_3@

    MapReduce读写各类数据系统是由相应的InputFormat和OutputFormat来支持

    @H_404_3@

    HBase为MapReduce开发了TableInputFormat和TableOutputFormat

读取HBase

UserRatetopn.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.textoutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class UserRatetopn {

    // 继承HBase提供的TableMapper,唯一用处是确定KEYIN,VALUEIN的类型:ImmutableBytesWritable,Result
    public static class TopnMapper extends TableMapper<Text, RateBean> {

        final byte[] F = "f1".getBytes();
        RateBean b = new RateBean();
        Text k = new Text();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            Counter counter = context.getCounter("DitryRead", "error_line");

            try {
                byte[] a = value.getValue(F, "a".getBytes());
                byte[] g = value.getValue(F, "g".getBytes());
                byte[] n = value.getValue(F, "n".getBytes());
                byte[] s = value.getValue(F, "s".getBytes());
                byte[] r = value.getValue(F, "r".getBytes());

                byte[] rowkeyBytes = key.get();
                String rk = new String(rowkeyBytes);

                String movieId = rk.substring(0, 6);
                String uid = rk.substring(rk.length() - 6);

                k.set(uid);
                b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));

                context.write(k, b);
            }catch (Exception e) {
                counter.increment(1);
            }
        }
    }

    public static class TopnReducer extends Reducer<Text, RateBean, RateBean, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {

            ArrayList<RateBean> lst = new ArrayList<>();

            // 迭代数据放入list缓存
            for (RateBean bean : values) {
                RateBean newBean = new RateBean();
                newBean.set(bean.getUid(), bean.getAge(), bean.getGender(), bean.getMovieId(), bean.getMovieName(), bean.getStyle(), bean.getRate());
                lst.add(newBean);
            }

            // 排序
            lst.sort((o1, o2) -> o2.getRate() - o1.getRate());

            int topn = 5;
            for(int i = 0; i < Math.min(topn, lst.size()); i++) {
                context.write(lst.get(i), NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
//        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
        conf.set("mapred.jar", "hbasemr-1.0-SNAPSHOT.jar");

        Job job = Job.getInstance(conf);

        job.setJarByClass(UserRatetopn.class);

        // 设置了Mapper为HBase提供的mapper,以及InputFormat为hbase提供的TableInputFormat
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);

        job.setReducerClass(TopnReducer.class);
        job.setoutputKeyClass(RateBean.class);
        job.setoutputValueClass(NullWritable.class);

        job.setoutputFormatClass(textoutputFormat.class);// 认,可不写
        FileOutputFormat.setoutputPath(job, new Path("d:/out"));

        job.waitForCompletion(true);
    }
}

RateBean.java

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class RateBean implements Writable {

    private String uid;
    private int age;
    private String gender;

    private String movieId;
    private String movieName;
    private String style;

    private int rate;

    public void set(String uid, int age, String gender, String movieId, String movieName, String style, int rate) {
        this.uid = uid;
        this.age = age;
        this.gender = gender;
        this.movieId = movieId;
        this.movieName = movieName;
        this.style = style;
        this.rate = rate;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public String getMovieName() {
        return movieName;
    }

    public void setMovieName(String movieName) {
        this.movieName = movieName;
    }

    public String getStyle() {
        return style;
    }

    public void setStyle(String style) {
        this.style = style;
    }

    public int getRate() {
        return rate;
    }

    public void setRate(int rate) {
        this.rate = rate;
    }

    @Override
    public String toString() {
        return "RateBean{" +
                "uid='" + uid + '\'' +
                ", age=" + age +
                ", gender='" + gender + '\'' +
                ", movieId='" + movieId + '\'' +
                ", movieName='" + movieName + '\'' +
                ", style='" + style + '\'' +
                ", rate=" + rate +
                '}';
    }

    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(this.uid);
        out.writeInt(this.age);
        out.writeUTF(this.gender);
        out.writeUTF(this.movieId);
        out.writeUTF(this.movieName);
        out.writeUTF(this.style);
        out.writeInt(this.rate);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        this.uid = in.readUTF();
        this.age = in.readInt();
        this.gender = in.readUTF();
        this.movieId = in.readUTF();
        this.movieName = in.readUTF();
        this.style = in.readUTF();
        this.rate = in.readInt();

    }
}

写入HBase

WordCount.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WordCount {

    public static class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] words = value.toString().split(" ");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    public static class WcReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }

            Put put = new Put((key.toString().getBytes()));
            put.addColumn("f".getBytes(), "cnt".getBytes(), Bytes.toBytes(count));

            context.write(null, put);
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");

        Job job = Job.getInstance(conf);

        job.setJarByClass(WordCount.class);

        job.setMapperClass(WcMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 使用HBase提供的工具设置输出OutputFormat为TableOutputFormat,以及指定目标表名称
        TableMapReduceUtil.initTableReducerJob("wordcount", WcReducer.class, job);

        // 设置文件输入路径
        FileInputFormat.setInputPaths(job, new Path("data/input/"));

        job.waitForCompletion(true);
    }
}

读写HBase

Hbasetopn.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
import java.util.ArrayList;

public class Hbasetopn {

    public static class TopnMapper extends TableMapper<Text, RateBean> {

        final byte[] F = "f1".getBytes();
        Text k = new Text();
        RateBean b = new RateBean();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            Counter counter = context.getCounter("DitryRead", "error_line");
            try {
                byte[] a = value.getValue(F, "a".getBytes());
                byte[] g = value.getValue(F, "g".getBytes());
                byte[] n = value.getValue(F, "n".getBytes());
                byte[] r = value.getValue(F, "r".getBytes());
                byte[] s = value.getValue(F, "s".getBytes());

                byte[] rkBytes = key.get();
                String rk = new String(rkBytes);

                String movieId = rk.substring(0, 6);
                String uid = rk.substring(rk.length() - 6);

                k.set(uid);
                b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));

                context.write(k, b);
            } catch (Exception e) {
                counter.increment(1);
            }
        }
    }

    public static class TopnReducer extends TableReducer<Text, RateBean, ImmutableBytesWritable> {

        @Override
        protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {

            ArrayList<RateBean> lst = new ArrayList<>();

            for (RateBean b : values) {
                RateBean newBean = new RateBean();
                newBean.set(b.getUid(), b.getAge(), b.getGender(), b.getMovieId(), b.getMovieName(),b.getStyle(),b.getRate());

                lst.add(newBean);
            }

            lst.sort((o1, o2) -> o2.getRate() - o1.getRate());

            int topn = 5;
            for (int i = 0; i < Math.min(topn, lst.size()); i++) {

                // 用uid+movieId作为rowkey,用bean的toString作为value
                RateBean rateBean = lst.get(i);
                String uid = rateBean.getUid();
                String movieId = rateBean.getMovieId();

                Put put = new Put((uid + " " + movieId).getBytes());
                put.addColumn("f".getBytes(), "c".getBytes(), rateBean.toString().getBytes());

                context.write(null, put);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
        conf.set("mapreduce.map.tasks", "5");

        Job job = Job.getInstance(conf);

        job.setJarByClass(Hbasetopn.class);

        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);

        job.setNumReduceTasks(3);
        TableMapReduceUtil.initTableReducerJob("topn", TopnReducer.class, job);

        job.waitForCompletion(true );

    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐