4.1数据完整性
检测损坏数据的常用方法是在第一次进入系统时计算数据的校验和,如果传输后新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是损坏了。
- 注意,校验和可能会错,数据却是正确的,但这种可能性不大,因为校验和远小于数据。
一个常用的数据检测代码是CRC-32(cyclic redundancy check,循环冗余检查),计算一个32位的任何大小输入的整数校验和。
4.4.1 HDFS的数据完整性
客户端写入数据并且将它发送到一个数据节点的管线中,在管线上的最后一个数据节点验证校验和。如果检测到错误,客户端便会受到一个checksum Exception,这是IOException的一个子类。
客户端读取数据节点上的数据时,会验证校验和,将其与数据节点上存储的校验和进行对比。每个数据节点维护一个连续的校验和验证日志,因此他知道每个数据块最后验证的时间。客户端成功验证数据块后,便会告诉数据节点,后者便随之更新日志。保持这种统计,对检测损坏磁盘是很有价值的。
吃了对客户端读取数据进行验证,每个数据节点还会在后台线程运行一个DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块。这是为了防止物理存储介质中位衰减所造成的数据损坏。
HDFS 有着块的副本,可以直接复制过来作为纠正。
- 其工作方式
如果客户端检测出错误,抛出checksum Exception 前报告该坏块以及它识图从名称节点读取的数据节点。名称节点将这个块标记为损坏的,因此它就不会直接复制给客户端,或者复制次副本到其他地方。他会从其他副本复制一个新的副本,损坏的副本将被删除。
我们可以在使用open()方法来读取文件之前,通过false传给filesystem中的setverichecksum()方法来禁用校验和验证。若是在shell命令的话,可以在-get或者其等效的-copyTOLocal命令中使用-ignoreCrc选项。
4.2 压缩
- 更快的压缩和解压速度通常会耗费更多的空间。(时空的权衡)
#最快的压缩方法, 和ZIP是通用的压缩工具,在时空处理上相对平衡。
gzip -l file
- LZO使用速度最优算法,但是压缩效率稍低。
4.2.1 编码、解码器
压缩从标准输入读取的数据并将它写到标准输出
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecclassname = args[0];
Class<?> codecclass = Class.forName(codecclassname);
Configuration conf = new Configuration();
本文档由Linux公社 www.linuxidc.com 收集整理
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecclass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IoUtils.copyBytes(system.in, out, 4096, false);
out.finish();
}
}
使用以上StreamCompressor程序与GunCodec压缩字符串“text”,然后使用gunzip从标准输入对它进行解压缩操作。
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text
4.2.3 在MR中使用压缩
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IoUtils.copyBytes(in, out, conf);
} finally {
IoUtils.closeStream(in);
IoUtils.closeStream(out);
}
}
}
% hadoop FileDecompressor file.gz
使用压缩池程序来压缩从标准输入读入后将其写入标准输出的数据
public class MaxTemperatureWithCompression {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setoutputPath(job, new Path(args[1]));
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(IntWritable.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setoutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
产生压缩的输出结果
usage
% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output
result
% gunzip -c output/part-r-00000.gz
1949 111
1950 22
4.3 序列化
特点
-
紧凑的
一个紧凑的格式使网络带宽的到充分利用,带宽是数据中心中最稀缺的资源。
-
进程间的通信是分布式系统的骨干,因此它必须尽量减少序列化和反序列化开销。
-
可扩展
协议随时间而变以满足新的要求,因此它应该直接演变为客户端和服务器端的控制协议。例如他应该可以加入一个新的参数方法调用,并且有新的服务器来接收来自老客户端的旧格式消息(不包括新的参数)。
-
互操作性
hadoop使用自己的序列化格式writeables,他紧凑、快速(当不容易扩展或java之外的语言)。由于writeables是hadoop的和兴(MR程序使用它来序列化键值对)。
自定义writeable,一对字符串
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareto(TextPair tp) {
int cmp = first.compareto(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareto(tp.second);
}
}
4.4 基于文件的数据结构
对于某些应用,需要一个特殊的数据结构来存储数据,针对运行基于MR的进程,将每个二进制数据块放入他自己的文件,这样做不易于扩展,所以hadoop开发了一系列高级容器。
编写一个sequenceFile类
public class SequenceFileWriteDemo {
private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door",
"Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IoUtils.closeStream(writer);
}
}
}
public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IoUtils.closeStream(reader);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。