微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

HBase自定义MapReduce-06

案例操作:把HBase中一张表的数据导入到另一张表中

源数据

插入到这张表中

 

主要是看这个HBase写MapReduce和Hadoop的MapReduce的不同,Mapper里面是把HBase里的数据拿出来,这个过程可以自己加一些清洗数据,或者是过滤一些字段的功能,主要是看这个用法.

Mapper

package com.buba.mapper;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

/*
TableMapper继承了Mapper,TableMapper里封装了专门用来写HBase的东西
输入参数类型就是ImmutableBytesWritable 固定的,读Hbase和hadoop不一样
 */
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        //将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
        Put put = new Put(key.get());
        //遍历添加column行
        for(Cell cell: value.rawCells()){
            //添加/克隆列族:info
            if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                //添加/克隆列:name
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    //将该列cell加入到put对象中
                    put.add(cell);
                    //添加/克隆列:color
                }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    //向该列cell加入到put对象中
                    put.add(cell);
                }
            }
        }
        //将从fruit读取到的每行数据写入到context中作为map的输出
        context.write(key, put);
    }
}

Reducer 

package com.buba.reducer;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class ReadFruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
            throws IOException, InterruptedException {
        //读出来的每一行数据写入到fruit_mr表中
        for(Put put: values){
            context.write(NullWritable.get(), put);
        }
    }


}

Driver

package com.buba.driver;

import com.buba.mapper.ReadFruitMapper;
import com.buba.reducer.ReadFruitReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReadFruitJob  extends Configured implements Tool {

    public static void main(String[] args) throws Exception{
        Configuration configuration = HBaseConfiguration.create();

        configuration.set("hbase.zookeeper.quorum","192.168.1.20,192.168.1.21,192.168.1.22");  //zookeeper 服务地址

        configuration.set("hbase.zookeeper.property.clientPort","2181"); //端口号

        ReadFruitJob readFruitJob = new ReadFruitJob();

        int run = ToolRunner.run(configuration, readFruitJob, args);

        System.exit(run);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = Job.getInstance(conf);

        Scan scan = new Scan();

        //扫描时是否进行缓存  这两个属性不设置也行
        scan.setCacheBlocks(false);

        //缓存行数
        scan.setCaching(500);

        //组装mapper
        TableMapReduceUtil.initTableMapperJob(
                "fruit",//Mapper操作的表名
                scan,//扫描表的对象
                ReadFruitMapper.class,//Mapper输入key
                ImmutableBytesWritable.class,//mapper输出value
                Put.class,
                job
        );

        //组装reducer
        TableMapReduceUtil.initTableReducerJob(
                "fruit_mr",
                ReadFruitReducer.class,
                job
        );

        //设置mapreducer个数
        job.setNumReduceTasks(1);

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐