微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Hadoop天气数据分析案例

需求:

找出每个月气温最高的2天(案例测试)

数据源:

1949-10-01 14:21:02    34c

1949-10-01 19:21:02    38c

1949-10-02 14:01:02    36c

1950-01-01 11:21:02    32c

1950-10-01 12:21:02    37c

1951-12-01 12:21:02    23c

1950-10-02 12:21:02    41c

1950-10-03 12:21:02    27c

1951-07-01 12:21:02    45c

1951-07-02 12:21:02    46c

1951-07-03 12:21:03    47c

项目结构:

TQtest.java

package com.tq.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TQtest {

       public static void main(String[] args) throws IOException,  ClassNotFoundException, InterruptedException {

             // Todo Auto-generated method stub

             // 1配置

             Configuration conf = new Configuration();

             Job job = Job.getInstance(conf);

             job.setJarByClass(TQtest.class);

             job.setJobName("tq");

             // 2设置输入路径和输出路径

             Path inpath = new Path("/tq/input");

             FileInputFormat.addInputPath(job, inpath);

             Path outpath = new Path("/tq/output");

             if (outpath.getFileSystem(conf).exists(outpath))

                    outpath.getFileSystem(conf).delete(outpath, true);

             FileOutputFormat.setoutputPath(job, outpath);

             // 3设置Mapper

             job.setMapperClass(Tmapper.class);

             job.setMapOutputKeyClass(Tq.class);

             job.setMapOutputValueClass(IntWritable.class);

             // 4 自定义比较器

             job.setSortComparatorClass(TqSortComparator.class);

             // 5自定义分区器

             job.setPartitionerClass(TPartitioner.class);

             // 6 自定义组排序

             job.setGroupingComparatorClass(TGroupCmparator.class);

             // 7 设置reducetask数量

             job.setNumReduceTasks(2);

             // 8 设置reducer

             job.setReducerClass(Treducer.class);

             // 9

             job.waitForCompletion(true);

       }

}

Tmapper.java

package com.tq.test;

 

 

import java.io.IOException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Calendar;

import java.util.Date;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.jboss.netty.util.internal.StringUtil;

 

 

public class Tmapper extends Mapper<LongWritable, Text, Tq, IntWritable> {

 

 

    Tq tkey = new Tq();

    IntWritable tvalue = new IntWritable();

 

 

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获得时间 温度数组

        String[] words = StringUtil.split(value.toString(), '\t');

        String pattern = "yyyy-MM-dd";

        SimpleDateFormat sdf = new SimpleDateFormat(pattern);

        try {

            // 处理日期

            Date date = sdf.parse(words[0]);

            Calendar cal = Calendar.getInstance();

            cal.setTime(date);

            tkey.setYear(cal.get(Calendar.YEAR));

            tkey.setMonth(cal.get(Calendar.MONTH) + 1);

            tkey.setDay(cal.get(Calendar.DAY_OF_MONTH));

            // 处理温度

            int temp = Integer.parseInt(words[1].substring(0, words[1].lastIndexOf("c")));

            tkey.setTemp(temp);

            tvalue.set(temp);

            context.write(tkey, tvalue);

        } catch (ParseException e) {

            // Todo Auto-generated catch block

            e.printstacktrace();

        }

 

 

    }

}

 

 

Tq.java

package com.tq.test;

 

 

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

 

import org.apache.hadoop.io.WritableComparable;

 

 

public class Tq implements WritableComparable<Tq> {

 

 

    private int year;

    private int month;

    private int day;

    private int temp;

 

 

    public int getYear() {

        return year;

    }

 

 

    public void setYear(int year) {

        this.year = year;

    }

 

 

    public int getMonth() {

        return month;

    }

 

 

    public void setMonth(int month) {

        this.month = month;

    }

 

 

    public int getDay() {

        return day;

    }

 

 

    public void setDay(int day) {

        this.day = day;

    }

 

 

    public int getTemp() {

        return temp;

    }

 

 

    public void setTemp(int temp) {

        this.temp = temp;

    }

 

 

    public void write(DataOutput out) throws IOException {

        out.writeInt(this.getYear());

        out.writeInt(this.getMonth());

        out.writeInt(this.getDay());

        out.writeInt(this.getTemp());

 

 

    }

 

 

    public void readFields(DataInput in) throws IOException {

        this.setYear(in.readInt());

        this.setMonth(in.readInt());

        this.setDay(in.readInt());

        this.setTemp(in.readInt());

    }

 

 

    public int compareto(Tq o) {

        int c1 = Integer.compare(this.getYear(), o.getYear());

        if (c1 == 0) {

            int c2 = Integer.compare(this.getMonth(), o.getMonth());

            if (c2 == 0) {

                return Integer.compare(this.getDay(), o.getDay());

            }

            return c2;

        }

        return c1;

    }

 

 

    @Override

    public String toString() {

        return year + "-" + month + "-" + day;

    }

 

 

}

TqSortComparator.java

package com.tq.test;

 

 

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

 

/**

* 实现天气 年月正序,温度倒序

*/

public class TqSortComparator extends WritableComparator {

    Tq t1 = null;

    Tq t2 = null;

 

 

    public TqSortComparator() {

        super(Tq.class, true);

    }

 

 

    public int compare(WritableComparable a, WritableComparable b) {

        t1 = (Tq) a;

        t2 = (Tq) b;

        int c1 = Integer.compare(t1.getYear(), t2.getYear());

        if (c1 == 0) {

            int c2 = Integer.compare(t1.getMonth(), t2.getMonth());

            if (c2 == 0) {

                return -Integer.compare(t1.getTemp(), t2.getTemp());

            }

            return c2;

        }

        return c1;

    }

}

 

TPartitioner.java

package com.tq.test;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Partitioner;

 

 

public class TPartitioner extends Partitioner<Tq, IntWritable> {

 

 

    @Override

    public int getPartition(Tq key, IntWritable value, int numPartitions) {

        // Todo Auto-generated method stub

        return key.getYear() % numPartitions;

    }

 

 

}

 

TGroupCmparator.java

package com.tq.test;

 

 

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

 

public class TGroupCmparator extends WritableComparator {

    Tq t1 = null;

    Tq t2 = null;

 

 

    public TGroupCmparator() {

        super(Tq.class, true);

    }

 

 

    public int compare(WritableComparable a, WritableComparable b) {

        t1 = (Tq) a;

        t2 = (Tq) b;

        int c1 = Integer.compare(t1.getYear(), t2.getYear());

        if (c1 == 0) {

            return Integer.compare(t1.getMonth(), t2.getMonth());

        }

        return c1;

    }

}

 

Treducer.java

package com.tq.test;

 

 

import java.io.IOException;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

 

public class Treducer extends Reducer<Tq, IntWritable, Text, IntWritable> {

    Text tkey = new Text();

    IntWritable tval = new IntWritable();

 

 

    @Override

    protected void reduce(Tq key, Iterable<IntWritable> vals, Context context)

            throws IOException, InterruptedException {

        int flag = 0;

        int day = 0;

        for (IntWritable val : vals) {

            if (flag == 0) {

                tkey.set(key.toString());

                tval.set(val.get());

                context.write(tkey, tval);

                flag++;

                day = key.getDay();

            }

            if (flag != 0 && day != key.getDay()) {

                tkey.set(key.toString());

                tval.set(val.get());

                context.write(tkey, tval);

                return;

            }

        }

    }

}

 

 

打包成jar,放到linux虚拟机上执行(要先启动hdfs yarn zookeeper DFSZKFailoverController)

hadoop jar tq.jar com.tq.test.TQtest

执行结果:

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐