微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark-Core学习笔记

Spark-Core学习笔记

RDD(Resilient distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。

0 三大数据结构

RDD:弹性式分布数据集
累加器:分布式共享只写变量
广播变量: 分布式共享只读变量

0.1 模拟搭建分布式计算数据结构

在这里插入图片描述

模拟diver

  def main(args: Array[String]): Unit = {
    import java.io.ObjectOutputStream
    import java.net.socket
    val client = new Socket("localhost",9999)
    val client1 = new Socket("localhost",8888)
    val out =new  ObjectOutputStream(client.getoutputStream)
    val out1 =new  ObjectOutputStream(client1.getoutputStream)
    /*
    把task任务和函数封装封装到两个小结构里,发送到Executor实现了基本的分布式计算
     */
    val task = new Task
    val task1 = new SubTask()
    val task2 = new SubTask()
    task1.list=task.list.take(2)
    task1.compu=task.compu
    task2.compu=task.compu
    task2.list=task.list.takeRight(2)

    out.writeObject(task1)
    out.flush()
    out.close()
    client.close()
    out1.writeObject(task2)
    out1.flush()
    out1.close()
    client.close()
    println("数据输出完成")
  }

模拟Executor

object Executor1 {
  def main(args: Array[String]): Unit = {
    import java.io.ObjectInputStream
    import java.net.ServerSocket
    val socket = new ServerSocket(9999)
    val server = socket.accept()
    val in = new ObjectInputStream(server.getInputStream)
    val readobject = in.readobject
    val result=readobject.asInstanceOf[SubTask].Compute_unit()

    println("收到数据为")
    println(result)
    in.close()
    server.close()
    socket.close()
    /*
    收到数据为
List(6, 8)
     */
  }
}

分布式数据

//模拟一个数据结构
case class Task() {
  val list: List[Int] =List(1,2,3,4)
  val compu:Int=>Int=_*2

  def Compute_unit()={
    list.map(compu(_))
  }
}
case class SubTask(){
  var list:List[Int]=_
  var compu:Int=>Int=_

  def Compute_unit()={
    list.map(compu(_))
  }
}

1.概述:流程与特性

1.1 工作流程 ————同类比MR流程

在这里插入图片描述

1.2 五大特性(特点)————

* Internally, each RDD is characterized by five main properties:
*
*  - A list of partitions//分区的列表
*  - A function for computing each split//计算切片的方法
*  - A list of dependencies on other RDDs//依赖其他RDD的列表
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)//可选:K-V RDD可以有分区器
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)//可选:计算节点的位置偏好列表,用于分析最佳的计算节点————类比于Hadoop的机架感知

1.3 RDD的创建方式——三种

//集合中创建算子
new SparkContext(conf).makeRDD(list1)
new SparkContext(conf).parallilize(list1)
//文件创建————文件 jdbc hbase
new SparkContext(conf).textFile(PATH)//按行读取文件内容
new SpaarkContext(conf).wholeTextFile(DirPATH)//按文件读取内容生成KV元组文件路径,文件内容)
//转换算子
groupby...

1.4 切片源码

注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量:
start = split.getStart() 
end = start + split.getLength

在这里插入图片描述

集合切片源码

def parallelize[T: classtag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())//创建集合RDD
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐