必备了解
SparkCore
- 离线
- 环境:
SparkContext
- 简称sc
- new SparkConf().setAppName(“wc”).setMaster(“local[*]”) new SparkContext(conf)
数据抽象
RDD--弹性分布式数据集合--RDD里面不存数据,只记录5大属性
RDD五大主要特征
RDD数据源
创建RDD
val fileRDD: RDD[String] =sc.textFile(文件路径,分区数) //外部存储系统的数据集创建(本地文件系统/hadoop支持的数据集)
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) //已有RDD调用RDD方法(Transformations方法)返回一个新的RDD
- 将
scala集合
转为RDD
val intRDD1 =sc.parallelize(Array(1,2,3,4,5,6,7,8),分区数) //将scala集合转为RDD
val intRDD2 = sc.makeRDD(List(1,2,3,4,5,6,7,8),分区数) //将scala集合转为RDD
RDD算子分类
Transformations:转换操作,返回值为新的RDD,只会记录转换操作和依赖关系,不会立即执行
map
(func)=>RDD- 每一个输入元素经过func转换
filter
(func)=>RDD- 每一个输入元素经过func计算,然后
返回 值为true的输入元素
- 每一个输入元素经过func计算,然后
flatMap
(func)=>RDDmap之后压平,func返回一个序列,不是单一元素
mapPartitions
(func)=>RDD- mapPartitionsWithIndex(func)=>RDD
union
(otherDataset)=>RDD- 两个RDD求并集,不去重
- intersection(otherDataset)=>RDD
- 两个RDD求交集
distinct
([numTasks])=>RDD- 对源RDD
去重
- 对源RDD
keys或values
- 获取所有key或value组成的RDD
mapVaules
- 对RDD所有的value操作,key不变
- groupByKey([numTasks])=>RDD
- reduceByKey(func,[numTasks])
- aggregateByKey(zerovalue)(seqOp, combOp, [numTasks])
- sortByKey([ascending], [numTasks])
sortBy
(func,[ascending], [numTasks])- 类似sortByKey,但更灵活,默认ture升序,字典排序可以x=>x+""
- join(otherDataset, [numTasks])
- leftOuterJoin
- 左外连接
- RightOuterJoin
- 右外连接
cache/persist
- 缓存/持久化,适合该RDD后面经常被用到,默认MEMORY_ONLY,将RDD以非序列化方式存储在JVM中,若内存不够,则某些分区不会缓存会重新计算
checkpoint
- cartesian(otherDataset)
- 笛卡尔积,推荐算法会用
- pipe(command, [envVars])
- 对RDD进行管道操作
coalesce
(numPartitions)repartition
(numPartitions)
Actions:动作操作,无返回值或返回值不是RDD(例collect/saveAsTextFile)
reduce(func)
- 注意:区分
reduceByKey(为transformation算子)
的算子分类
- 注意:区分
collect()
- 以数组形式返回数据集的所有元素,适合小数据量,注意:只计算外层集合
- collectAsMap()
- 将RDD收集为本地集合
count()
- 返回RDD的元素个数
first()
- 返回第一个元素,类似take(1),没有排序
take(n)
- 返回前n个元素,没有排序
top(n)
- 将数据拉回Driver进行排序再取前TopN(最大的前n个),不适合大数据量
- takeOrdered(n,[ordering])
- 返回自然顺序或者自定义顺序的前n个元素
- saveAsTextFile(path)
- saveAsSequenceFile(path)
- saveAsObjectFile(path)
- 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
- countByKey()
foreach
(func)foreachPartition
(func)
特别提出来:统计操作
count
- 个数
- mean
- 均值
- sum
- 求和,比如求平均,可以很方便的先groupByKey,然后mapValues(v=>{v.sum/v.size})
max
- 最大值
min
- 最小值
- variance
- 方差
- sampleVariance
- 从采样中计算方差
- stdev
- 标准差:衡量数据的离散程度
- sampleStdev
- 采样的标准差
- stats
- 查看统计结果
RDD宽窄依赖
宽依赖
父RDD
的一个分区
会被子RDD
的多个分区所依赖
,与划分Stage阶段有关
窄依赖
- 本质就是有没有shuffle(因为它会涉及到数据交互跨网络传输)
Sparksql
- 离线
- 环境:
SparkSession
数据交互
- 普通文本
- json
- parquet
- csv
- MysqL
数据抽象
DataFrame
rdd.toDF/ds.todf
spark.read.text("文本路径")
spark.read.json("文本路径") //有完整约束
spark.read.parquet("文本路径") //有完整约束
- 读取
show()
Dataset
rdd.tods/df.as[泛型]
spark.read.textFile("文本路径")
....
- 读取
show()
- 底层都是RDD的封装,可以理解成
分布式的表
,所以要转换的话需要考虑加Schema或者泛型
怎么用
自定义函数
SparkStreaming
- 实时
- 环境:StreamingContext
- new StreamingContext(sc,Seconds(5)) //指定微批处理的划分间隔
数据抽象
- DStream(discretized Stream,离散数据流)
本质是一系列时间上连续的RDD(微批处理)
API分类
Transformations
无状态转换:每个批次的处理不依赖之前批次
map
(func)flatMap
(func)filter
(func)union
(otherStream)- 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
reduceByKey
(func, [numTasks])- 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join
(otherStream, [numTasks])- 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
- transform(func)
有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。
UpdateStateByKey
(func)- 可以和历史数据累加,聚合
reduceByKeyAndWindow
(方法,窗口长度,滑动间隔)- 可以实现窗口(窗口长度,滑动间隔)
Output/Action
- print() --打印到控制台
- saveAsTextFiles(prefix, [suffix])
- saveAsObjectFiles(prefix,[suffix])
- saveAsHadoopFiles(prefix,[suffix])
- foreachRDD(func)
- 对Dstream里面的每个RDD执行func
StructuredStreaming
- 实时
- 环境:SparkSession
- 因为数据抽象还是DataFrame/Dataset,调用spark.readStream做流处理
数据抽象
- DataFrame/Dataset
底层原理是一个无界表
- 图解
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。