微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

hdfs普通文本文件合并lzo压缩

环境准备

安装lzop

sudo apt-get install lzop

shell 主脚本

#! /bin/bash
localpath=/home/impdatahd/testData/hdfs_test
outpath=/home/impdatahd/testData/merged
hdfspath=/user/hive/warehouse/test/origin/behavior_log_andr_test
dt='2022-01-04'

function checkFileExist(){
`hadoop fs -test -e $1`
#echo "in func"+$1
if [ $? -eq 0 ] ;then
  return 0;
else
  return 1;
fi
}
function init(){
#init
rm  $localpath/*
rm  $outpath/*.log
rm  $outpath/*.log.lzo
}


function do_main(){
hdfs dfs -get $hdfspath/dt=$1/* $localpath
#merge
python $outpath/merge.py
#compression
lzop -Uv $outpath/*.log 

#delete and mv hfds old file
hdfs dfs -rm -r -f $hdfspath/dt=$1/*
hdfs dfs -put  $outpath/*.log.lzo $hdfspath/dt=$1

}
#index
function create_index(){
hadoop jar /home/impdatahd/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21.jar  com.hadoop.compression.lzo.distributedLzoIndexer $hdfspath/dt=$1
}

for ((i=0;i<1;i++));
do
   cdt=`date -d "$dt $i days" +%Y-%m-%d`
   echo $cdt
   checkFileExist $hdfspath/dt=$cdt/*.lzo
   if [ $? == 0 ];then
   echo "$cdt 已完成压缩,跳过操作"
   continue;
   else
      checkFileExist $hdfspath/dt=$cdt/test-*
      if [ $? == 0 ];then
      init
      do_main $cdt
      create_index $cdt
      else
        echo "$cdt 没有目标文件,跳过操作!"
	continue;
      fi
   fi
done

python合并文件脚本

压缩文件不能直接合并,可以考虑,现将文件解压缩,然后再合并。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# -------------------------------------------------------------------------------
# File Name   : merge.py
# Description :
# Author      : zhengkw
# Email       : [email protected]
# Date        : 2022/1/04 14:18
# -------------------------------------------------------------------------------
import os
input_path = "/home/impdatahd/testData/hdfs_test/" #此处填好自己的路径,注意最后的"/"

#判断文件类型,如果是一般log文件,直接合并,如果是lzo文件,只合并lzo文件,不能合并index文件。



#使用os.listdir函数获取路径下的所有的文件名,并存在一个list中
#使用os.path.join函数,将文件名和路径拼成绝对路径
whole_file = [os.path.join(input_path,file) for file in os.listdir(input_path)]
content = []
#对于每一个路径,将其打开之后,使用readlines获取全部内容
for w in whole_file:
    with open(w,'rb') as f:
         content = content+f.readlines()
#构造输出的路径,和输入路径在同一个文件夹下,如果该文件夹内没有这个文件自动创建
output_path = os.path.join('/home/impdatahd/testData/merged/','merged_andr.log')
 #将内容写入文件
with open(output_path,'wb') as f:
    f.writelines(content)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐