微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – 在Hadoop中处理reduce步骤中的大输出值

在我的MapReduce程序的Reduce阶段,我正在执行的唯一操作是将所提供的Iterator中的每个值连接起来,如下所示:

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

我的问题是一些减少输出值是巨大的文本行;如此大,即使初始大小非常大,字符串缓冲区也必须增加(加倍)它的大小几次,以适应迭代器的所有上下文,从而导致内存问题.

在传统的Java应用程序中,这表明对文件的缓冲写入将是写入输出的首选方法.如何在Hadoop中处理极大的输出键值对?我应该将结果直接流式传输到HDFS上的文件(每个reduce调用一个文件)吗?有没有办法缓冲输出,而不是output.collect方法

注意:我已经尽可能地增加了内存/堆.此外,有几个消息来源表示增加减速器的数量可以帮助解决内存/堆问题,但是这里的问题直接追溯到SringBuilder在扩展其容量时的使用.

谢谢

解决方法:

不是我理解为什么你想要有巨大的价值,但有一种方法可以做到这一点.

如果编写自己的OutputFormat,则可以修复RecordWriter.write(Key,Value)方法的行为,以根据Key值是否为null来处理值连接.

这样,在reducer中,您可以按如下方式编写代码(键的第一个输出是实际键,之后的所有内容都是null键:

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

然后,实际的RecordWriter.write()具有以下逻辑来处理空键/值连接逻辑:

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

        if (!nullKey) {
            // if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

            // write out the key and separator
            writeObject(key);
            out.write(keyvalueSeparator);
        } else if (!nullValue) {
            // write out the value delimiter
            out.write(valueDelimiter);
        }

        // write out the value
        writeObject(value);

        // track that we've written some data
        dataWritten = true;
    }

    public synchronized void close(Reporter reporter) throws IOException {
        // if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

        out.close();
    }

您会注意到close方法也被修改为在写出的最后一条记录上写一个尾随换行符

完整的代码清单可以在pastebin找到,这是测试输出

key1    value1
key2    value1,value2,value3
key3    value1,value2

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐