案例操作:把HBase中一张表的数据导入到另一张表中
源数据
插入到这张表中
主要是看这个HBase写MapReduce和Hadoop的MapReduce的不同,Mapper里面是把HBase里的数据拿出来,这个过程可以自己加一些清洗数据,或者是过滤一些字段的功能,主要是看这个用法.
Mapper
package com.buba.mapper;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
/*
TableMapper继承了Mapper,TableMapper里封装了专门用来写HBase的东西
输入参数类型就是ImmutableBytesWritable 固定的,读Hbase和hadoop不一样
*/
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}
Reducer
package com.buba.reducer;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class ReadFruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
Driver
package com.buba.driver;
import com.buba.mapper.ReadFruitMapper;
import com.buba.reducer.ReadFruitReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReadFruitJob extends Configured implements Tool {
public static void main(String[] args) throws Exception{
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","192.168.1.20,192.168.1.21,192.168.1.22"); //zookeeper 服务地址
configuration.set("hbase.zookeeper.property.clientPort","2181"); //端口号
ReadFruitJob readFruitJob = new ReadFruitJob();
int run = ToolRunner.run(configuration, readFruitJob, args);
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf);
Scan scan = new Scan();
//扫描时是否进行缓存 这两个属性不设置也行
scan.setCacheBlocks(false);
//缓存行数
scan.setCaching(500);
//组装mapper
TableMapReduceUtil.initTableMapperJob(
"fruit",//Mapper操作的表名
scan,//扫描表的对象
ReadFruitMapper.class,//Mapper输入key
ImmutableBytesWritable.class,//mapper输出value
Put.class,
job
);
//组装reducer
TableMapReduceUtil.initTableReducerJob(
"fruit_mr",
ReadFruitReducer.class,
job
);
//设置mapreducer个数
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。