微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark 笔记4 sparkRDD

目录

spark RDD

关于sparkRDD基本概念

  • RDD:弹性分布式数据集,是spark对数据的核心抽象,也是spark数据处理的基本单位
    spark处理数据之前会首先把数据转换成RDD然后在RDD上对数据进行操作
  • spark对RDD的数据操作,其本身有两种对于RDD的算子:转换(transform)和行动(action),这两个分类下分别由各自对应的数个api函数
    对于数据,spark的操作过程是:创建RDD、对RDD进行转化操作(transform)、用行动操作来求值(action)
  • 该数据操作流程的便捷性:spark底层节点的协商、容错、通信的细节,这样在上层对于数据的操作就变得更容易

学习对于RDD的基本操作

主从节点的启动

首先就像第一次学在笔记1里面记录的一样,启动spark主节点的服务,然后在localhost:8080查看spark主节点的参数并且记录下来


然后就可以使用这个主节点的参数,启动这个主节点的从节点

spark的初始化

在开发程序时,spark的初始化操作首先就是创建一个SparkConf对象,这个对象包含应用的一些信息,然后创建SparkContext,SparkContext可以让 Spark 知道如何访问集群
那么,代码是这个样子的
就是在指定app名和主节点所在的spark集群之后,使用这个conf对象指定给一个sparkcontext方法来创建一个sparkcontext

val conf = new SparkConf().setAppName("Shiyanlou").setMaster("spark://7576cf9c687e:7077")

new SparkContext(conf)
// 在每个JVM中,只有一个SparkContext能够被激活。若需要创建新的SparkContext,你必须调用sc.stop()来关闭当前已激活的那个

在spark-shell里做spark的初始化并不需要新建这两个对象,因为 Spark Shell 相当于一个 Spark 应用,启动时已经用过spark-shell --master spark://7576cf9c687e:7077来指定集群信息,所以 Spark Shell 启动后已经具备了一个 SparkContext 对象sc

RDD创建

Spark 上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在 Spark 集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是 RDD(RDD:resilient distributed dataset)弹性分布式数据集。RDD 可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。
RDD 可以从 HDFS 中的文件创建,也可以从 Scala 或 Python 集合中创建。
创建 RDD 有两种方式:一种是调用 SparkContext 的 parallelize() 方法将数据并行化生成 RDD,另外一种方法是从外部存储中的数据集生成 RDD(如 Linux 文件系统,HDFS,HBase 等)

调用parallelize()方法并行化生成RDD

如果要对已有的集合进行并行化,我们可以先创建一个列表,然后调用上面提到的sc的parallelize方法将该集合并行化。集合中的元素会被复制到一个 RDD 中。并行集合创建后可以进行 RDD 的分布式操作,一个很重要的参数是切片数(slices),表示数据集被切分的份数,Spark 会为每个切片运行一个任务并能够根据集群状况动态调整切片数量。使用parallelize方法的参数可以手动设置切片数。
这种并行集合生成RDD的办法会把所有的数据都放在内存里,所以除了开发原型和测试以外,一般不采用这种方式

就这样,把新建的数据列表传给parallelize这个函数,这个函数就会在这个数据集合的基础上为我们创建RDD,并且RDD的切片数同样可以通过parallelize函数来指定

使用外部存储中的数据集生成RDD

在实际开发中最常用的是从外部存储系统中读取数据创建 RDD。Spark 可以从任何 Hadoop 支持的存储上创建 RDD,比如本地的文件系统、HDFS、Cassandra、HBase 等。同时 Spark 还支持文本文件、SequenceFiles 等

注意事项

  • 使用不同的 SparkContext 的函数接口可以在不同的外部存储场景下创建RDD。然后使用 testfile() 方法会返回一个 RDD 对象,然后就可以调用 RDD 中定义的方法
  • 如果使用本地存储上的文本文件,这个文件必须可以被所有节点 worker 访问
  • 支持目录,压缩文件通配
  • 同上一节的并行集合一样,textFile 函数还有另外一个接口控制切片数目
// 从 protocols 文件中创建 RDD
val distFile = sc.textFile("/etc/protocols")

// RDD 操作:计算所有行的长度之和,最后结果为 2868
distFile.map(s => s.length).reduce((a,b) => a + b)

RDD的这个操作也是做的一个mapreduce,用map来把每一行映射成每一行的长度,reduce做的是把数据集合里面的元素相加

正式的、RDD的基础操作

对于RDD的基础操作有两种:transformations和actions

  • 转换(transformations):将已存在的数据集 RDD 转换成新的数据集 RDD,例如 map。转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。
  • 动作(actions) :在数据集 RDD 上进行计算后返回一个结果值给驱动程序,例如 reduce。
// 从 protocols 文件创建RDD
val distFile = sc.textFile("/etc/protocols")

// Map 操作,每行的长度
val lineLengths = distFile.map(s => s.length)

// Reduce 操作,获得所有行长度的和,即文件总长度,这里才会执行 map 运算
val totalLength = lineLengths.reduce((a, b) => a + b)

// 可以将转换后的 RDD 保存到集群内存中
lineLengths.persist()

上面这个例子里面,map操作敲进去的时候,并没有被执行,在敲完reduce求和的时候,map运算才被执行的,也就是说想要的到最后map出来的结果要执行完reduce才行
persist方法是把map完的那个RDD保留到内存里

总结

基本编程步骤总结

所以,课程里面是这样总结的:

RDD 基本编程步骤可以总结为:

  1. 读取内、外部数据集创建 RDD。
  2. 对于 RDD 进行转化生成新的 RDD ,比如 map()、filter() 等。
  3. 对需要重用的数据执行 persist()/cache() 进行缓存。
  4. 执行行动操作获得最终结果,比如 count() 和 first()等。

没有做的实践操作

导入并使用jar包

还没有一个具体的应用场景,让我指定某个具体的jar包

集成编译环境下的配置操作


TO BE CONTINUED

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐