微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark基础

@H_502_5@什么是Spark

基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。

@H_502_5@Spark特点

快: Spark计算速度是MapReduce计算速度的10-100倍
易用:(算法多)MR支持1种计算模型,Spsark支持更多的计算模型。
通用: Spark 能够进行离线计算、交互式查询快速查询)、实时计算、机器学习、图计算等
兼容性: Spark支持大数据中的Yarn调度,支持mesos。可以处理hadoop计算的数据。

@H_502_5@Spark运行模式
  1. local本地模式:Spark单机运行,一般用于开发测试。
  2. standalone独立集群模式:开发测试使用
  3. standalone-HA高可用模式:生产环境使用
  4. on yarn集群模式:生产环境使用
  5. on cloud集群模式:中小公司未来会更多的使用云服务
@H_502_5@Spark编写代码
  1. 创建一个 Sparkconf对象,设置app名称
  2. 创建一个SparkContext,
  3. 读取数据,对数据进行计算
  4. 保存数据

SparkCore

@H_502_5@什么是RDD

弹性分布式数据集(数据存储在内存),一个不可变、可分区、里面的元素可并行计算的集合

@H_502_5@RDD的主要属性
  1. 数据集的基本组成单位是一组分片(Partition)或一个分区(Partition)列表
    每个分片都会被一个计算任务处理,分片数决定并行度。
  2. 一个函数会被作用在每一个分区。
  3. 一个RDD会依赖于其他多个RDD,RDD的每次转换都会生成一个新的RDD
@H_502_5@RDD的算子分为两类:
  1. Transformation转换操作:返回一个新的RDD
  2. Action动作操作:返回值不是RDD
    惰性计算,遇到Transformation不计算,遇到Action在真正计算。
@H_502_5@Rdd数据持久化什么作用?
  1. 对多次使用的rdd进行缓存,缓存到内存,当后续频繁使用时直接在内存中读取缓存的数据,不需要重新计算。 (Persist、Cache)

  2. 将RDD结果写入硬盘(容错机制),当RDD丢失数据时,或依赖的RDD丢失数据时,可以使用持久化到硬盘的数据恢复。(MEMORY_ONLY(认)、MEMORY_AND_disK、disK_ONLY)
    SparkContext.setCheckpointDir(“目录”) //HDFS的目录 RDD.checkpoint()

@H_502_5@cache和Checkpoint的区别
  1. 位置:Persist 和 Cache将数据保存在内存;Checkpoint将数据保存在HDFS。

  2. 生命周期:Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法;Checkpoint永久存储不会被删除

  3. RDD依赖关系(血统Lineage):Persist和Cache,不会丢掉RDD间的依赖链/依赖关系;Checkpoint会斩断依赖链。

@H_502_5@什么是宽窄依赖

窄依赖: 父RDD的一个分区只会被子RDD的一个分区依赖
宽依赖: 父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)

@H_502_5@什么是DAG

DAG: 指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程)

@H_502_5@DAG边界

开始: 通过SparkContext创建的RDD
结束: 触发Action,一旦触发Action就形成了一个完整的DAG
说明:
一个Spark应用中可以有一到多个DAG,取决于触发了多少次Action
一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖
一个阶段/stage中可以有多个Task,一个分区对应一个Task

@H_502_5@Spark概念
  1. Application:指的是用户编写的Spark应用程序/代码,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码
  2. Driver:Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等
  3. Cluster Manager:指的是在集群上获取资源的外部服务,Standalone模式下由Master负责,Yarn模式下ResourceManager负责;
  4. Executor:是运行在工作节点Worker上的进程,负责运行任务,并为应用程序存储数据,是执行分区计算任务的进程;
  5. RDD:Resilient distributed Dataset弹性分布式数据集,是分布式内存的一个抽象概念;
  6. DAG:Directed Acyclic Graph有向无环图,反映RDD之间的依赖关系和执行流程;
  7. Job:作业,按照DAG执行就是一个作业;Job==DAG
  8. Stage:阶段,是作业的基本调度单位,同一个Stage中的Task可以并行执行,多个Task组成TaskSet任务集
  9. Task:任务,运行在Executor上的工作单元,一个Task计算一个分区,包括pipline上的一系列操作
@H_502_5@Spark执行任务的基本流程

1.Spark应用被提交–>SparkContext向资源管理器注册并申请资源 (??) -->启动Executor
2.RDD–>构建DAG–>DAGScheduler划分Stage形成TaskSet–>TaskScheduler提交Task–>Worker上的Executor执行Task

@H_502_5@累加器的作用

累加器accumulators: 累加器支持在所有不同节点之间进行累加计算

@H_502_5@广播变量的作用

在每个机器上缓存一份、不可变的、只读的、相同的变量,该节点每个任务都能访问。起到节省资源的作用,和优化的所用。

Sparksql基本介绍

@H_502_5@什么是Sparksql?

用于处理结构化数据的Spark模块。

@H_502_5@Sparksql底层的数据抽象

DataFrame和DataSet

@H_502_5@Hive和Sparksql的对比
  • Hive是将sql转化成MapReduce进行计算(降低学习成本、提高开发效率)
  • Sparksql是将sql转化成rdd集进行计算(降低学习成本、提高开发效率)
@H_502_5@什么是DataFrame??

DataFrame是以RDD为基础的带有Schema元信息的分布式数据集。

@H_502_5@什么是DataSet??

含有类型信息的DataFrame就是DataSet
(DataSAEt=DataFrame+类型= Schema+RDD*n+类型)

@H_502_5@SparkSQL查询数据的形态
  1. 类似方法调用,领域特定语言(DSL)。
    personDF.select("id","id","id",“name”,"age"+1).filter("age"+1).filter("age"+1).filter(“age”>25).show
  2. SQL语句
    spark.sql(“select * from personDFT where age >25”).show

添加Schema的方式

第1种: 指定列名添加Schema
第2种: 通过StructType指定Schema
第3种: 编写样例类,利用反射机制推断Schema

@H_502_5@指定列名添加Schema代码流程
  1. 创建SparkSession
  2. 创建SparkContext
  3. 读取数据并加工
  4. 设置表结构 ttRDD.toDF(“id”,“name”,“age”)
  5. 注册成表并查询
  6. 关闭sc sparksession
@H_502_5@通过StructType指定Schema代码流程
  1. 创建SparkSession

  2. 创建SparkContext

  3. 读取数据并加工

  4. 设置表结构

    types.StructType(
        //   字段类型  (字段名,字段类型,是否为空)
        List(StructField("id",IntegerType,true) 
        )
      )
    
  5. 创建DS DF
    val ttDF: DataFrame = spark.createDataFrame(RowRDD,structTable)

  6. 注册成表并查询

  7. 关闭SparkContext 、SparkSession

@H_502_5@利用反射机制推断Schema代码流程

准备样例类

  1. 创建Sparkession

  2. 创建SparkContext

  3. 读取数据并加工
    val PersonRDD: RDD[Person] = ttRDD.map(z=>Person(z(0).toInt,z(1),z(2).toInt))

  4. RDD转DF
    val personDF: DataFrame = PersonRDD.toDF()

  5. 注册成表并查询

  6. 关闭SparkContext 、SparkSession

@H_502_5@RDD、DF、DS三者之间的转化
  • 转换成RDD .rdd
  • 转换成DF .toDF()
  • 转换成DS
    RDD->DS .toDS()
    DF->DS .as[Person]
@H_502_5@Spark sql自定义函数
  spark.udf.register("toupperAdd123",(str:String)=>{
    //根据业务逻辑对数据进行处理
    //str.toupperCase()+ " 123"
    //str.length*10
    str.length*10/2/2.todouble
  })
@H_502_5@开窗函数的作用

显示聚集前/排序前的原始的数据,又显示聚集后/排序后的名次的数据。

@H_502_5@开窗函数分类
  1. 聚和开窗函数
    聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句
  2. 排序聚和函数
    排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句)
@H_502_5@聚和开窗函数

select name, class, score, count(name) over() name_count from scores
select name, class, score, count(name) over(PARTITION BY class) name_count from scores

@H_502_5@排序聚和函数

select name, class, score, row_number() over(order by score) rank from scores
select name, class, score, row_number() over(partition by class order by score) rank from scores

@H_502_5@RANK跳跃排序

select name, class, score, rank() over(order by score) rank from scores
select name, class, score, rank() over(partition by class order by score) rank from scores

@H_502_5@DENSE_RANK连续排序

select name, class, score, dense_rank() over(order by score) rank from scores

@H_502_5@NTILE分组排名

select name, class, score, ntile(6) over(order by score) rank from scores

@H_502_5@什么是Spark Streaming

Spark Streaming是一个基于Spark Core之上的实时计算框架

@H_502_5@什么是DStream

代表持续性的输入的数据流和经过各种Spark算子操作后的输出的结果数据流。
本质上就是按照时间间隔划分成一批一批的连续的RDD

@H_502_5@阐明RDD、DataFrame、DataSet、DStream数据抽象之间的关系。

DStream=RDD1(t1)+ RDD2(t2)+ RDD3(t3)+ RDD4(t4)+….
DataSet = DataFrame+类型 = RDD+结构+类型
DataFrame = RDD+结构

@H_502_5@SparkStreaming代码过程
  1. 创建sparkConf
  2. 创建一个sparkcontext
  3. 创建streamingcontext
  4. 接收数据并根据业务逻辑进行计算
  5. 开启计算任务
  6. 等待关闭
@H_502_5@窗口宽度和滑动距离的关系
  • 窗口宽度=滑动距离:不重复读取数据,也不丢失数据
    数据接入是将数据的切分宽度放大(不使用窗口函数
  • 窗口宽度>滑动距离:有部分数据会被重复计算
  • 窗口宽度<滑动距离:会有部分数据丢失(企业一般不会使用这种方式)
@H_502_5@0.8版本SparkStreaming集成kafka是的差异
  • Receiver接收方式
    1、多个Receiver接受数据效率高,但有丢失数据的风险。
    2、开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
    3、Zookeeper维护offset有重复消费数据可能。
    4、使用高层次的API
  • Direct直连方式
    1、不使用Receiver,直接到kafka分区中读取数据
    2、不使用日志(WAL)机制。
    3、Spark自己维护offset
    4、使用低层次的API
@H_502_5@什么是Structured Streaming

Structured Streaming是一个基于Spark sql引擎的可扩展、容错的流处理引擎

@H_502_5@Structured Streaming模型

一个不断增长的动态表格,新数据被持续不断地添加到表格的末尾
对动态数据源进行实时查询,就是对当前的表格内容执行一次 sql 查询

@H_502_5@Structured Streaming应用场景

将数据源映射为类似于关系数据库中的表,(Sparksql中的DF/DS)
然后将经过计算得到的结果映射为另一张表.

@H_502_5@详细描述下图内容

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐