在pyspark 中保存rdd的内存到文件的时候,会遇到文件夹已经存在而失败,所以如果文件夹已经存在,需要先删除。
搜索了下资料,发现pyspark并没有提供直接管理hdfs文件系统的功能。寻找到一个删除的方法,是通过调用shell命令 hadoop fs -rm -f 来删除,这个方法感觉不怎么好,所以继续找。
后来通过查找hadoop hdfs 的源代码发现 hdfs是通过java 的包 org.appache.hadoop.fs 中的几个类来完成的,也有用java创建 hdfs对象再调用对象的删除方法可删除。
所以下一步思路就是尝试是不是可以把 java的类在 python 中引用,最后得到的答案是肯定的,代码如下:
def jPath(sc, filepath):
jPathClass = sc._gateway.jvm.org.apache.hadoop.fs.Path
return jPathClass(filepath)
def jFileSystem(sc):
jFileSystemClass = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
hadoop_configuration = sc._jsc.hadoopConfiguration()
return jFileSystemClass.get(hadoop_configuration)
def write(sc, filepath, content, overwite=True):
try:
fs = jFileSystem(sc)
out = fs.create(jPath(sc, filepath), overwrite)
out.write(bytearray(content, "utf-8"))
out.flush()
out.close()
except Exception as e:
print(e)
spark = spark_create()
sc = spark._sc
sc.setLogLevel("ERROR")
sqc = sqlContext(sc)
#
# 删除HDFS目录主程序
#
def __name__ == "__main__":
del_hdfs = "hdfs://host:port/del_filepath"
fs = jFileSystem(sc)
del_path_obj = jPath(sc, del_hdfs)
if fs.exists(del_path_obj):
fs.delete(del_path_obj)
print(" del ok. hdfs=%s"%del_hdfs)
else:
print(" path not exists. hdfs=%s"%del_hdfs)
可顺利在 pyspark 中使用 java 类,核心是 sc._gateway.jvm 来实现 ,代码中用到的
org.apache.hadoop.fs.FileSystem
org.apache.hadoop.fs.Path
是在 hadoop 的 jar包 hadoop-common-2.10.1.jar中,所以SPARK在提交的时候要加上这个jar包
spark-submit --jar= xxxx/hadoop-common-2.10.1.jar
即可,如果不加会报 java类找不到的ERROR
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。