微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【hadoop权威指南第四版】第四章hadoop的IO【笔记+代码】

4.1数据完整性

检测损坏数据的常用方法是在第一次进入系统时计算数据的校验和,如果传输后新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是损坏了。

  • 注意,校验和可能会错,数据却是正确的,但这种可能性不大,因为校验和远小于数据。

一个常用的数据检测代码是CRC-32(cyclic redundancy check,循环冗余检查),计算一个32位的任何大小输入的整数校验和。

4.4.1 HDFS的数据完整性

客户端写入数据并且将它发送到一个数据节点的管线中,在管线上的最后一个数据节点验证校验和。如果检测到错误,客户端便会受到一个checksum Exception,这是IOException的一个子类。

客户端读取数据节点上的数据时,会验证校验和,将其与数据节点上存储的校验和进行对比。每个数据节点维护一个连续的校验和验证日志,因此他知道每个数据块最后验证的时间。客户端成功验证数据块后,便会告诉数据节点,后者便随之更新日志。保持这种统计,对检测损坏磁盘是很有价值的。

吃了对客户端读取数据进行验证,每个数据节点还会在后台线程运行一个DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块。这是为了防止物理存储介质中位衰减所造成的数据损坏。

HDFS 有着块的副本,可以直接复制过来作为纠正。

  • 其工作方式

如果客户端检测出错误,抛出checksum Exception 前报告该坏块以及它识图从名称节点读取的数据节点。名称节点将这个块标记为损坏的,因此它就不会直接复制给客户端,或者复制次副本到其他地方。他会从其他副本复制一个新的副本,损坏的副本将被删除

但是,有些文件不想被自动删除,或许他可能可以挽救?

我们可以在使用open()方法来读取文件之前,通过false传给filesystem中的setverichecksum()方法来禁用校验和验证。若是在shell命令的话,可以在-get或者其等效的-copyTOLocal命令中使用-ignoreCrc选项。

4.2 压缩

  • 更快的压缩和解压速度通常会耗费更多的空间。(时空的权衡)
#最快的压缩方法, 和ZIP是通用的压缩工具,在时空处理上相对平衡。
gzip -l file
  • LZO使用速度最优算法,但是压缩效率稍低。

4.2.1 编码、解码器

压缩从标准输入读取的数据并将它写到标准输出

public class StreamCompressor {
    public static void main(String[] args) throws Exception {
    String codecclassname = args[0];
    Class<?> codecclass = Class.forName(codecclassname);
    Configuration conf = new Configuration();
    本文档由Linux公社 www.linuxidc.com 收集整理
     CompressionCodec codec = (CompressionCodec)
    ReflectionUtils.newInstance(codecclass, conf);
    CompressionOutputStream out = codec.createOutputStream(System.out);
    IoUtils.copyBytes(system.in, out, 4096, false);
    out.finish();
    }
}

使用以上StreamCompressor程序与GunCodec压缩字符串“text”,然后使用gunzip从标准输入对它进行解压缩操作。

% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text

4.2.3 在MR中使用压缩

根据文件的扩展名,利用编码、译码器对压缩文件进行解压

public class FileDecompressor {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
            System.err.println("No codec found for " + uri);
            System.exit(1);
        }
        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IoUtils.copyBytes(in, out, conf);
        } finally {
            IoUtils.closeStream(in);
            IoUtils.closeStream(out);
        }
    }
}
% hadoop FileDecompressor file.gz

使用压缩池程序来压缩从标准输入读入后将其写入标准输出的数据

public class MaxTemperatureWithCompression {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setoutputPath(job, new Path(args[1]));
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(IntWritable.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setoutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

产生压缩的输出结果

usage

% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output

result

% gunzip -c output/part-r-00000.gz
1949 111
1950 22

4.3 序列化

特点

  • 紧凑的

    一个紧凑的格式使网络带宽的到充分利用,带宽是数据中心中最稀缺的资源。

  • 快速

    进程间的通信是分布式系统的骨干,因此它必须尽量减少序列化和反序列化开销。

  • 可扩展

    协议随时间而变以满足新的要求,因此它应该直接演变为客户端和服务器端的控制协议。例如他应该可以加入一个新的参数方法调用,并且有新的服务器来接收来自老客户端的旧格式消息(不包括新的参数)。

  • 互操作性

    对于某些系统,最好能够支持用不同的语言编写的客户端被写入服务器端,所以需要为此而精心设计文件格式。

hadoop使用自己的序列化格式writeables,他紧凑、快速(当不容易扩展或java之外的语言)。由于writeables是hadoop的和兴(MR程序使用它来序列化键值对)。

自定义writeable,一对字符串

import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }

    @Override
    public int compareto(TextPair tp) {
        int cmp = first.compareto(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareto(tp.second);
    }
}

4.4 基于文件的数据结构

对于某些应用,需要一个特殊的数据结构来存储数据,针对运行基于MR的进程,将每个二进制数据块放入他自己的文件,这样做不易于扩展,所以hadoop开发了一系列高级容器。

编写一个sequenceFile类

public class SequenceFileWriteDemo {
    private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door",
            "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IoUtils.closeStream(writer);
        }
    }
}

读取一个序列文件

public class SequenceFileReadDemo {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : "";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition(); // beginning of next record
            }
        } finally {
            IoUtils.closeStream(reader);
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐