- @H_404_3@概述@H_404_3@读取HBase
- @H_404_3@UserRateTopn.java@H_404_3@RateBean.java
- @H_404_3@WordCount.java
- @H_404_3@HbaseTopn.java
概述
-
@H_404_3@
MapReduce是运算框架
@H_404_3@HBase是数据存储系统
@H_404_3@MapReduce读写各类数据系统是由相应的InputFormat和OutputFormat来支持
@H_404_3@HBase为MapReduce开发了TableInputFormat和TableOutputFormat
读取HBase
UserRatetopn.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.textoutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public class UserRatetopn {
// 继承HBase提供的TableMapper,唯一用处是确定KEYIN,VALUEIN的类型:ImmutableBytesWritable,Result
public static class TopnMapper extends TableMapper<Text, RateBean> {
final byte[] F = "f1".getBytes();
RateBean b = new RateBean();
Text k = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter("DitryRead", "error_line");
try {
byte[] a = value.getValue(F, "a".getBytes());
byte[] g = value.getValue(F, "g".getBytes());
byte[] n = value.getValue(F, "n".getBytes());
byte[] s = value.getValue(F, "s".getBytes());
byte[] r = value.getValue(F, "r".getBytes());
byte[] rowkeyBytes = key.get();
String rk = new String(rowkeyBytes);
String movieId = rk.substring(0, 6);
String uid = rk.substring(rk.length() - 6);
k.set(uid);
b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));
context.write(k, b);
}catch (Exception e) {
counter.increment(1);
}
}
}
public static class TopnReducer extends Reducer<Text, RateBean, RateBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {
ArrayList<RateBean> lst = new ArrayList<>();
// 迭代数据放入list缓存
for (RateBean bean : values) {
RateBean newBean = new RateBean();
newBean.set(bean.getUid(), bean.getAge(), bean.getGender(), bean.getMovieId(), bean.getMovieName(), bean.getStyle(), bean.getRate());
lst.add(newBean);
}
// 排序
lst.sort((o1, o2) -> o2.getRate() - o1.getRate());
int topn = 5;
for(int i = 0; i < Math.min(topn, lst.size()); i++) {
context.write(lst.get(i), NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
conf.set("mapred.jar", "hbasemr-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(conf);
job.setJarByClass(UserRatetopn.class);
// 设置了Mapper为HBase提供的mapper,以及InputFormat为hbase提供的TableInputFormat
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);
job.setReducerClass(TopnReducer.class);
job.setoutputKeyClass(RateBean.class);
job.setoutputValueClass(NullWritable.class);
job.setoutputFormatClass(textoutputFormat.class);// 默认,可不写
FileOutputFormat.setoutputPath(job, new Path("d:/out"));
job.waitForCompletion(true);
}
}
RateBean.java
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class RateBean implements Writable {
private String uid;
private int age;
private String gender;
private String movieId;
private String movieName;
private String style;
private int rate;
public void set(String uid, int age, String gender, String movieId, String movieName, String style, int rate) {
this.uid = uid;
this.age = age;
this.gender = gender;
this.movieId = movieId;
this.movieName = movieName;
this.style = style;
this.rate = rate;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getMovieId() {
return movieId;
}
public void setMovieId(String movieId) {
this.movieId = movieId;
}
public String getMovieName() {
return movieName;
}
public void setMovieName(String movieName) {
this.movieName = movieName;
}
public String getStyle() {
return style;
}
public void setStyle(String style) {
this.style = style;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
@Override
public String toString() {
return "RateBean{" +
"uid='" + uid + '\'' +
", age=" + age +
", gender='" + gender + '\'' +
", movieId='" + movieId + '\'' +
", movieName='" + movieName + '\'' +
", style='" + style + '\'' +
", rate=" + rate +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.uid);
out.writeInt(this.age);
out.writeUTF(this.gender);
out.writeUTF(this.movieId);
out.writeUTF(this.movieName);
out.writeUTF(this.style);
out.writeInt(this.rate);
}
@Override
public void readFields(DataInput in) throws IOException {
this.uid = in.readUTF();
this.age = in.readInt();
this.gender = in.readUTF();
this.movieId = in.readUTF();
this.movieName = in.readUTF();
this.style = in.readUTF();
this.rate = in.readInt();
}
}
写入HBase
WordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WordCount {
public static class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class WcReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
Put put = new Put((key.toString().getBytes()));
put.addColumn("f".getBytes(), "cnt".getBytes(), Bytes.toBytes(count));
context.write(null, put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(WcMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 使用HBase提供的工具设置输出OutputFormat为TableOutputFormat,以及指定目标表名称
TableMapReduceUtil.initTableReducerJob("wordcount", WcReducer.class, job);
// 设置文件输入路径
FileInputFormat.setInputPaths(job, new Path("data/input/"));
job.waitForCompletion(true);
}
}
读写HBase
Hbasetopn.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
import java.util.ArrayList;
public class Hbasetopn {
public static class TopnMapper extends TableMapper<Text, RateBean> {
final byte[] F = "f1".getBytes();
Text k = new Text();
RateBean b = new RateBean();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter("DitryRead", "error_line");
try {
byte[] a = value.getValue(F, "a".getBytes());
byte[] g = value.getValue(F, "g".getBytes());
byte[] n = value.getValue(F, "n".getBytes());
byte[] r = value.getValue(F, "r".getBytes());
byte[] s = value.getValue(F, "s".getBytes());
byte[] rkBytes = key.get();
String rk = new String(rkBytes);
String movieId = rk.substring(0, 6);
String uid = rk.substring(rk.length() - 6);
k.set(uid);
b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));
context.write(k, b);
} catch (Exception e) {
counter.increment(1);
}
}
}
public static class TopnReducer extends TableReducer<Text, RateBean, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {
ArrayList<RateBean> lst = new ArrayList<>();
for (RateBean b : values) {
RateBean newBean = new RateBean();
newBean.set(b.getUid(), b.getAge(), b.getGender(), b.getMovieId(), b.getMovieName(),b.getStyle(),b.getRate());
lst.add(newBean);
}
lst.sort((o1, o2) -> o2.getRate() - o1.getRate());
int topn = 5;
for (int i = 0; i < Math.min(topn, lst.size()); i++) {
// 用uid+movieId作为rowkey,用bean的toString作为value
RateBean rateBean = lst.get(i);
String uid = rateBean.getUid();
String movieId = rateBean.getMovieId();
Put put = new Put((uid + " " + movieId).getBytes());
put.addColumn("f".getBytes(), "c".getBytes(), rateBean.toString().getBytes());
context.write(null, put);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
conf.set("mapreduce.map.tasks", "5");
Job job = Job.getInstance(conf);
job.setJarByClass(Hbasetopn.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);
job.setNumReduceTasks(3);
TableMapReduceUtil.initTableReducerJob("topn", TopnReducer.class, job);
job.waitForCompletion(true );
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。