我正在尝试将管道输出到不同的目录,以使每个目录的输出将基于某些ID进行存储.
因此,在普通地图精简代码中,我将使用MultipleOutputs类,并在精简器中执行类似的操作.
protected void reduce(final SomeKey key,
final Iterable<SomeValue> values,
final Context context) {
...
for (SomeValue value: values) {
String bucketId = computeBucketIdFrom(...);
multipleOutputs.write(key, value, folderName + "/" + bucketId);
...
所以我想一个人可以在烫伤中做到这一点
...
val somePipe = Csv(in, separator = "\t",
fields = someSchema,
skipHeader = true)
.read
for (i <- 1 until numberOfBuckets) {
somePipe
.filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
.write(Csv(out + "/bucket" + i ,
writeHeader = true,
separator = "\t"))
}
还有其他选择吗?
谢谢
解决方法:
是的,使用TemplatedTsv当然是更好的方法.
因此,您的上述代码可以编写如下,
val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
.read
.write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))
这会将来自’some_id的所有记录放入out / some_ids文件夹下的单独文件夹中.
但是,您也可以创建整数存储桶.只需更改最后几行,
.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))
这将创建两个数字文件夹,如out / dd /.您还可以检查templatedTsv API here.
使用templatedTsv可能会有一个小问题,即reducer会生成很多小文件,这可能会对使用您的结果进行下一个工作不利.因此,最好在写入磁盘之前对模板字段进行排序.我写了一个关于它的博客here.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。