微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

5.RDD操作综合实例

一、词频统计

A. 分步骤实现 

1、准备文件

下载小说或长篇新闻稿

上传到hdfs上

start-all.sh
hdfs dfs -put 666.txt
hdfs dfs -ls

2、读文件创建RDD

>>> lines = sc.textFile("/home/hadoop/666.txt")
>>> lines.foreach(print)

3、分词

>>> words =lines.flatMap(lambda line :line.split())                            
>>> words.collect()

4、排除大小写lower()map()

>>> words2 = words.map(lambda word:word.lower())
>>> words2.collect()

标点符号re.split(pattern,str)flatMap(),

>>> import re
>>> words3 = words2.flatMap(lambda line:re.split('\W+',line))
>>> words3.collect()

 

停用词,stopwords.txt,filter()

hdfs dfs -put /home/hadoop/virtual_share/stopwords.txt
hdfs dfs -ls

>>> words4 = words3.flatMap(lambda a:a.split())
>>> words4.collect()

>>> stopwords = sc.textFile("file:///home/hadoop/stopwords.txt").flatMap(lambda a:a.split()).collect()
>>> stopwords

>>> words5 = words4.filter(lambda a:a not in stopwords)
>>> words5.collect()

长度小于2的词filter()

>>> words6 = words5.filter(lambda a:len(a)>2)
>>> words6.collect()

 

5统计词频

>>> words7 = words6.map(lambda a:(a,1))
>>> words7.collect()

 

>>> words7 = words7.reduceByKey(lambda a,b:a+b)
>>> words7.collect()

6、按词频排序

>>> words8 = words7.sortBy(lambda x:x[1], False)
>>> words8.collect()

 

7输出文件

>>> words8.savaAsTextFile('ad_RDD')
>>> words8.saveAsTextFile('file:///home/hadoop/ad_RDD')

8、查看结果

cd ad_RDD/
cat part-00000 | head -5

 

B. 一句话实现:文件文件

 

C. 和作业2的“二、Python编程练习:英文文本的词频统计 ”进行比较,理解Spark编程的特点

Spark的特性主要有以下四点:

快速

  • 与 Hadoop 的 MapReduce 相比, Spark 基于内存的运算是 MapReduce 的 100 倍.基于硬盘的运算也要快 10 倍以上.
  • Spark 实现了高效的 DAG 执行引擎, 可以通过基于内存来高效处理数据流

易用

  • Spark 支持 Scala, Java, Python, R 和 sql 脚本, 并提供了超过 80 种高性能的算法, 非常容易创建并行 App
  • 而且 Spark 支持交互式的 Python 和 Scala 的 shell, 这意味着可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法, 而不是像以前一样 需要打包, 上传集群, 验证等. 这对于原型开发非常重要.

通用

  • Spark 结合了sql, Streaming和复杂分析.
  • Spark 提供了大量的类库, 包括 sql 和 DataFrames, 机器学习(MLlib), 图计算(GraphicX), 实时流处理(Spark Streaming) .
  • 可以把这些类库无缝的柔和在一个 App 中.
  • 减少了开发和维护的人力成本以及部署平台的物力成本.

易融合性

  • Spark 可以非常方便的与其他开源产品进行融合,比如, Spark 可以使用 Hadoop 的 YARN 和 Appache Mesos 作为它的资源管理和调度器, 并且可以处理所有 Hadoop 支持的数据, 包括 HDFS, HBase等.

二、求Top值

上传文件到hdfs

 

1、丢弃不合规范的行:

分词

处理空行、少数据项

处理缺失数据

>>> import re
>>> account = sc.textFile('payment.txt').flatMap(lambda a:re.split('\W+',a)).flatMap(lambda a:a.split())
>>> account.collect()

2、支付金额转换为数值型,按支付金额排序

>>> account1 = account.map(lambda a:(a,int(a)))
>>> account1.collect()

3、取出Top3

>>> account1.saveAsTextFile('top')

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐