微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Hadoop排序、分区

MapReduce是大数据当中的计算框架,计算出来的结果认是就行字典排序的,且实现该排序的方法是字典排序,在Reduce Task上进行归并排序上的。有部分、全局、辅助、二次排序和自定义排序等。

WritableComparable

快速排序

在这里插入图片描述

package wmy.hadoop.mapreduce.quickSort;

/*
 *@description: 实现快速排序
 *@author: 情深@骚明
 *@time: 2021/2/5 8:07
 *@Version 1.0
 */

/**
 * 利用分治算法和递归的思想来进行快速排序
 * 快速排序之所比较快,因为相比冒泡排序,每次交换是跳跃式的。每次排序的时候设置一个基准点,
 * 将小于等于基准点的数全部放到基准点的左边,将大于等于基准点的数全部放到基准点的右边。
 * 这样在每次交换的时候就不会像冒泡排序一样每次只能在相邻的数之间进行交换,交换的距离
 * 就大的多了。因此总的比较和交换次数就少了,速度自然就提高了。当然在最坏的情况下,
 * 仍可能是相邻的两个数进行了交换。因此快速排序的最差时间复杂度和冒泡排序是一样的都是O(N2),
 * 它的平均时间复杂度为O(NlogN)。
 */
public class quickSort {
    public static void sort(int[] arr, int low, int high) throws Exception {
        if (low > high) throw(new Exception("传入的参数出现错误:low > high"));

        int i = low; // 左哨兵
        int j = high; // 右哨兵
        int tmp = arr[low]; // 基准值

        while (i < j) { // 循环比较的条件
            while (tmp <= arr[j] && i < j) {  // 右边的数据统一比左边的数据大
                j--;
            }
            while (tmp >= arr[i] && i < j) { // 左边的数据统一比左边的要大
                i++;
            }
            if (i < j) {
                int temp = arr[j];
                arr[j] = arr[i];
                arr[i] = temp;
            }
        }
        // 交换基准值
        arr[low] = arr[i];
        arr[i] = tmp;

        // 递归查询
        sort(arr, low, j - 1);
        sort(arr, j + 1, high);
    }

    public static void main(String[] args) throws Exception {
        // 测试数据
        int[] arr = {6, 1, 2, 7, 9, 3, 4, 5, 10, 8};
        sort(arr, 0, arr.length - 1);
        for (int i = 0; i < arr.length; i++) {
            System.out.println(arr[i]);
        }
    }
}

实现Comparable

排序

@Override
public int compareto(flowBean o) {
    return Long.compare(o.sumFlow,this.sumFlow);
}

源码

/**
* Compares two {@code long} values numerically.
* The value returned is identical to what would be returned by:
* <pre>
*    Long.valueOf(x).compareto(Long.valueOf(y))
* </pre>
*
* @param  x the first {@code long} to compare
* @param  y the second {@code long} to compare
* @return the value {@code 0} if {@code x == y};
*         a value less than {@code 0} if {@code x < y}; and
*         a value greater than {@code 0} if {@code x > y}
* @since 1.7
*/
public static int compare(long x, long y) {
   return (x < y) ? -1 : ((x == y) ? 0 : 1);
}

Flow序列化的结果

13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18271575951	1527	2106	3633
18390173782	9531	2412	11943
84188413	4116	1432	5548

WritableComparable

在框架中的多态引用,所以必须得使用这个方法

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

代码

package wmy.hadoop.mapreduce.writeTableComparable;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
 *@description: 自定义实现排序
 *@author: 情深@骚明
 *@time: 2021/2/5 7:56
 *@Version 1.0
 */
public class flowBean implements WritableComparable<flowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public flowBean() {
    }

    /**
     * 将来包装数据的时候方便一点
     * 我们需要把这个Bean放在Hadoop的框架中,所以我们需要实现Hadoop的序列化
     * @param upFlow
     * @param downFlow
     */
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }


    @Override
    public int compareto(flowBean o) {
        return Long.compare(o.sumFlow,this.sumFlow);
    }

    /**
     * 注意:怎么样序列化就怎么样反序列,顺序一定要一致,否则的话可能会出现结果不一致的情况
     * 序列化 ---> 通过out写给我们的Hadoop框架
     * @param out 框架给我们提供的数据出口
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化
     * @param in 框架提供的数据来源
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }
}

sortMapper

package wmy.hadoop.mapreduce.writeTableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/5 8:25
 *@Version 1.0
 */
public class sortMapper extends Mapper<LongWritable, Text, flowBean, Text> {
    // 定义输出类型
    private flowBean flow = new flowBean();
    private Text phone = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 13470253144	180	180	360
        // 获取数据
        String[] fields = value.toString().split("\t");
        phone.set(fields[0]);
        flow.setUpFlow(Long.parseLong(fields[1]));
        flow.setDownFlow(Long.parseLong(fields[2]));
        flow.setSumFlow(Long.parseLong(fields[3]));
        context.write(flow,phone);
    }
}

sortReducer

package wmy.hadoop.mapreduce.writeTableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/5 8:36
 *@Version 1.0
 */
public class sortReducer extends Reducer<flowBean, Text, Text, flowBean> {
    @Override
    protected void reduce(flowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

sortDriver

package wmy.hadoop.mapreduce.writeTableComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/5 8:38
 *@Version 1.0
 */
public class sortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(sortDriver.class);
        job.setMapperClass(sortMapper.class);
        job.setReducerClass(sortReducer.class);
        job.setMapOutputKeyClass(flowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(flowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

成功截图

在这里插入图片描述

在这里插入图片描述

注意:框架想要排序的话,把想要排序的字段放到Reduce输出类型的key上面

排序且分区

MyPartitionSort

package wmy.hadoop.mapreduce.writableComarableAndPartition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import wmy.hadoop.mapreduce.writeTableComparable.flowBean;

/*
 *@description: 分区是在Mapper之后进行分区的,Reduce的数量决定分区的数量
 *@author: 情深@骚明
 *@time: 2021/2/5 8:46
 *@Version 1.0
 */
public class MyPartitionSort extends Partitioner<flowBean, Text> {
    @Override
    public int getPartition(flowBean flowBean, Text text, int numPartitions) {
        switch (text.toString().substring(0, 3)) {
            case "136": return 0;
            case "137": return 1;
            case "138": return 2;
            case "139": return 3;
            default:return 4;
        }
    }
}

sortPartitionDriver

package wmy.hadoop.mapreduce.writableComarableAndPartition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import wmy.hadoop.mapreduce.writeTableComparable.flowBean;
import wmy.hadoop.mapreduce.writeTableComparable.sortMapper;
import wmy.hadoop.mapreduce.writeTableComparable.sortReducer;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/5 8:38
 *@Version 1.0
 */
public class sortPartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(sortPartitionDriver.class);
        job.setMapperClass(sortMapper.class);
        job.setReducerClass(sortReducer.class);
        job.setMapOutputKeyClass(flowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(flowBean.class);

        job.setPartitionerClass(MyPartitionSort.class);
        job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

运行结果

在这里插入图片描述

在这里插入图片描述


MapReduce的代码不是很难,只要能明白编程思想就可以了。然后根据自己的需要可以慢慢的写出代码逻辑。

Combiner合并

Combiner的父类是Reduce,它在每一个MapTask所在的节点上运行,Reduce是接收全局所有的Mapper输出结果,进行全局汇总,以减少网络传输量,Combiner应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reduce的输入KV类型要对应

wordcount案例
abc abc abc def
def def

abc abc
def def

Combiner是对Map Task局部的结果进行汇总,在Mapper之后,核心是IO,会导致很多数据冗余,IO压力降低,减少Map端的输出认的话不开启,Combiner使用是有限制的,不能影响业务的前提才能进行。局部汇总可能会导致结果的不准确,Combiner的输入和输出的类型必须是一样,因为Combiner办呢改变Mapper最终的输出类型。
Combiner的输入文件必须也是一个有序的数据,有序才能进行合并。可能会出现重复的key,先找到文件所有有序的位置,总共发生3次排序,一次快排,两次归并
第一次Combiner启动之后就没有重复的key了,第二次归并有序。
Combiner的位置是在第三次归并排序之前都有效。

package wmy.hadoop.mapreduce.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import wmy.hadoop.mapreduce.Wordcount.WcMapper;
import wmy.hadoop.mapreduce.Wordcount.WcReduce;

import java.io.IOException;

/*
 *@description:
 *  设置Map、Reducer信息来符合MapReduce计算框架
 *  相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装MapReduce程序相关的job对象
 *@author: 情深@骚明
 *@time: 2021/2/3 17:06
 *@Version 1.0
 */
public class WcCombinerDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //设置客户端身份 以具备权限在hdfs上进行操作
        // System.setProperty("HADOOP_USER_NAME","root");

        // 获取一个Job实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 设置我们的类路径(classpath)
        job.setJarByClass(WcCombinerDriver.class);

        // 设置Mapper和Reducer
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduce.class);

        // 设置Mapper和Reducer的数据输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(IntWritable.class);

        //job.setNumReduceTasks(5);
        job.setCombinerClass(WcReduce.class);

        // 设置数据源
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));

        // 提交我们的job
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? 0 : 1);

    }
}

结果比较

在这里插入图片描述


在这里插入图片描述

GroupingComparator

二次排序

按照两个条件排序,非常典型的二次排序

package wmy.hadoop.mapreduce.groupingComparator;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
 *@description: 二次排序
 *@author: 情深@骚明
 *@time: 2021/2/6 8:18
 *@Version 1.0
 */
public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;
    private String productId;
    private double price;

    @Override
    public String toString() {
        return orderId + '\t' + productId + '\t' + price;
    }

    public String getorderId() {
        return orderId;
    }

    public void setorderId(String orderId) {
        this.orderId = orderId;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public int compareto(OrderBean o) {
        // 先比较orderId,price
        int compare = this.orderId.compareto(o.orderId);
        if (compare == 0) {
            return Double.compare(o.price,this.price); // 降序
        }else {
            return compare;
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.price = in.readDouble();
    }
}

OrderMapper

NullWritable

单例的设计模式 —> 饿汉式


/** Returns the single instance of this class. */
public static NullWritable get() { return THIS; }
package wmy.hadoop.mapreduce.groupingComparator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/6 8:30
 *@Version 1.0
 */
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    private OrderBean orderBean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取数据 0000001	Pdt_01	222.8
        String[] fields = value.toString().split("\t");
        orderBean.setorderId(fields[0]);
        orderBean.setProductId(fields[1]);
        orderBean.setPrice(Double.parseDouble(fields[2]));
        context.write(orderBean, NullWritable.get());
    }
}

OrderComparator

MapReduce数据传递并不是以对象的方式传递的,这样会出现空指针异常。
大数据背景下,不可能每条数据都变成对象,造成堆内存浪费。内存不够放硬盘,通过反序列化和序列化减轻压力。非常紧凑。

  protected WritableComparator(Class<? extends WritableComparable> keyClass,
                               Configuration conf,
                               boolean createInstances) {
    this.keyClass = keyClass;
    this.conf = (conf != null) ? conf : new Configuration();
    if (createInstances) {
      key1 = newKey();
      key2 = newKey();
      buffer = new DataInputBuffer();
    } else {
      key1 = key2 = null;
      buffer = null;
    }
  }

在这里插入图片描述

package wmy.hadoop.mapreduce.groupingComparator;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/*
 *@description: 分组完之后进行Reduce
 *@author: 情深@骚明
 *@time: 2021/2/6 8:31
 *@Version 1.0
 */
public class OrderComparator extends WritableComparator {
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;

        return oa.getorderId().compareto(ob.getorderId());
    }
}

OrderReduce

package wmy.hadoop.mapreduce.groupingComparator;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 *@description: NullWritable ---> 里面就是一个单例
 *@author: 情深@骚明
 *@time: 2021/2/6 8:31
 *@Version 1.0
 */
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get()); // 就是最高价格
    }
}

OrderDriver

package wmy.hadoop.mapreduce.groupingComparator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/*
 *@description:
 *@author: 情深@骚明
 *@time: 2021/2/6 8:31
 *@Version 1.0
 */
public class OderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(OderDriver.class);
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
        
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setoutputKeyClass(OrderBean.class);
        job.setoutputValueClass(NullWritable.class);
        
        job.setGroupingComparatorClass(OrderComparator.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

运行结果

在这里插入图片描述


在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐