OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
OutputFormat常用的实现类textoutputFormat和SequenceFileOutputFormat
1、textoutputFormat(文本输出)
默认的输出格式是textoutputFormat,它把每条记录写为文本行。键和值可以是任意类型,textoutputFormat调用toString()方法转换为字符串。
2、SequenceFileOutputFormat
格式紧凑,容易被压缩
3、自定义OutputFormat
(1)使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat
比如需要根据数据的不同输出两类结果到不同的目录中,此时可以使用自定义OutputFormat
(2)自定义OutputFormat步骤
1)自定义类继承FileOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new FilterRecordWriter(context); } }
2)改写RecordWriter,具体改写输出数据的方法write()
public class FilterRecordWriter extends RecordWriter<Text,NullWritable>{ private FSDataOutputStream hadoopOutputStream=null; private FSDataOutputStream otherOutputStream=null; @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IoUtils.closeStream(hadoopOutputStream); IoUtils.closeStream(otherOutputStream); } @Override public void write(Text text, NullWritable writable) throws IOException, InterruptedException { if(text.toString().contains("www.123.com")){ hadoopOutputStream.write(text.toString().getBytes()); }else{ otherOutputStream.write(text.toString().getBytes()); } } public FilterRecordWriter(TaskAttemptContext context) { FileSystem fileSystem=null; try { //获取文件系统 fileSystem = FileSystem.get(context.getConfiguration()); //创建输出文件路径 Path hadoopPath = new Path("/mapreduce/outputFormat/output/123.log"); Path otherPath = new Path("/mapreduce/outputFormat/output/other.log"); hadoopOutputStream=fileSystem.create(hadoopPath); otherOutputStream=fileSystem.create(otherPath); } catch (IOException e) { e.printstacktrace(); } } }
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); context.write(new Text(line), NullWritable.get()); } }
public class FilterReduce extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text text, Iterable<NullWritable> iterable, Context context) throws IOException, InterruptedException { //防止text重复被过滤掉 for(NullWritable nullWritable:iterable){ context.write(new Text(text.toString()+"\r\n"), NullWritable.get()); } } }
public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "root"); Configuration configuration=new Configuration(); Job job = Job.getInstance(configuration); job.setoutputFormatClass(FilterOutputFormat.class); job.setMapperClass(FilterMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(FilterReduce.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/mapreduce/outputFormat/log")); FileOutputFormat.setoutputPath(job, new Path("/mapreduce/outputFormat/output")); boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion==true?0:1); }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。