一 spark简介
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache的顶级项目,2014年5月发布spark1.0,2016年7月发布spark2.0,2020年6月18日发布spark3.0.0
1.spark特点
- Ease of Use:简洁易用
- Generality:通用、全栈式数据处理
- Runs Everywhere:可以运行在各种资源调度框架和读写多种数据源
- 丰富的数据源支持。Spark除了可以访问操作系统自身的本地文件系统和HDFS之外,还可以访问 Cassandra、HBase、Hive、Alluxio(Tachyon)以及任何 Hadoop兼容的数据源。这极大地方便了已经 的大数据系统进行顺利迁移到Spark。
2.Spark与MapReduce的对比
MapReduce和Spark的本质区别
- MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。
- 既可以做离线计算,有可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用。
- 注意:MR和Spark在Shuffle时数据都落本地磁盘
3.spark架构体系
-
三种模式
- standalone client模式
- standalone cluster模式
- Spark On YARN cluster模式
-
Spark执行流程简介
-
Job:RDD每一个行动操作都会生成一个或者多个调度阶段 调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage对应一个TaskSet,一个Task中包含多Task,TaskSet的数量与该阶段最后一个RDD的分区数相同。
-
Task:分发到Executor上的工作任务,是Spark的最小执行单元
-
DAGScheduler:DAGScheduler是将DAG根据宽依赖将切分Stage,负责划分调度阶段并Stage转成TaskSet提交给TaskScheduler
-
TaskScheduler:TaskScheduler是将Task调度到Worker下的Exexcutor进程,然后丢入到Executor的线程池的中进行执行
-
-
Spark中重要角色
- Master :是一个Java进程,接收Worker的注册信息和心跳、移除异常超时的Worker、接收客户端提交的任务、负责资源调度、命令Worker启动Executor。
- Worker :是一个Java进程,负责管理当前节点的资源管理,向Master注册并定期发送心跳,负责启动Executor、并监控Executor的状态。
- SparkSubmit :是一个Java进程,负责向Master提交任务。
- Driver :是很多类的统称,可以认为SparkContext就是Driver,client模式Driver运行在SparkSubmit进程中,cluster模式单独运行在一个进程中,负责将用户编写的代码转成Tasks,然后调度到Executor中执行,并监控Task的状态和执行进度。
- Executor :是一个Java进程,负责执行Driver端生成的Task,将Task放入线程中运行。
-
Spark和Yarn角色对比(Spark StandAlone的Client模式对比YARN)
- Master => ResourceManager
- Worker => NodeManager
- Executor => YarnChild
- SparkSubmit(Driver) => ApplicationMaster
二 集群搭建及启动
1.下载与安装
-
下载spark,通过国内的镜像网站进行下载(如清华镜像站等)
-
传输到集群中的一台并解压
-
export JAVA_HOME=/opt/apps/jdk1.8.0_141
export SPARK_MASTER_HOST=linux01 -
修改conf下的slaves,加入worker的设备
2.spark-shell启动
/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077 --executor-memory 2g --total-executor-cores 4
三 spark编程入门
1. scala入门程序
使用Scala编写Spark的WorkCount
-
创建一个maven项目
-
导入依赖等信息
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn._51doit</groupId> <artifactId>spark-in-action</artifactId> <version>1.0-SNAPSHOT</version> <!-- 定义了一些常量 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.12.12</scala.version> <spark.version>3.0.1</spark.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- 导入scala的依赖 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <!-- 编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!-- 编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>meta-inf/*.SF</exclude> <exclude>meta-inf/*.DSA</exclude> <exclude>meta-inf/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
编写main方法
package cn.doit.day01.demo01 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) //1 创建RDD:指定[以后]从HDFS中读取数据 val lines: RDD[String] = sc.textFile(args(0)) //2 对数据进行切分压平 val words = lines.flatMap(_.split(" ")) //3 将单词和一组合 val wordAndOne = words.map((_, 1)) //4 单词聚合,先局部聚合,再全局聚合,使用reduceByKey val wordAndNums = wordAndOne.reduceByKey(_ + _) //5 输出结果并保存 wordAndNums.saveAsTextFile(args(1)) } }
-
打包
可以使用provided 指定依赖是否打入包中
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency>
使用 指定main方法
<!-- 指定maven(main)方法 --> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>cn.doit.day01.demo01.WordCount</mainClass> </transformer> </transformers>
-
将jar包导入集群中执行
-
如果不指定main方法,需要指定class,指定类名
–class cn.doit.day01.demo01.WordCount
./bin/spark-submit --master spark://linux01:7077 【--class cn.doit.day01.demo01.WordCount】 /root/spark_learn.jar hdfs://linux01:8020/anli/wordCount/input hdfs://linux01:8020/anli/wordCount/output02
-
2.使用java编写
java编写,并在本地运行
代码如下:
public class WordCount {
public static void main(String[] args) {
//创建连接,设置进程名()
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//如果在本地运行,设置Master所调用的线程资源数,一般使用local[*],调用全部资源(不能设置为1)
conf.setMaster("local[*]");
//javaSparkContext是对SparkContext的包装类
JavaSparkContext context = new JavaSparkContext(conf);
//设置文件输入路径
JavaRDD<String> rdd = context.textFile(args[0]);
//切分压平
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.stream(s.split(" ")).iterator();
}
});
//单词和一结合
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return Tuple2.apply(s, 1);
}
});
//聚合
JavaPairRDD<String, Integer> wordAndNums = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//交换顺序
JavaPairRDD<Integer, String> numsAndWord = wordAndNums.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return Tuple2.apply(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});
//排序
JavaPairRDD<Integer, String> sortednumsAndWord = numsAndWord.sortByKey(false);
//再交换顺序
JavaPairRDD<String, Integer> sortedWordAndNums = sortednumsAndWord.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return Tuple2.apply(integerStringTuple2._2, integerStringTuple2._1);
}
});
//保存
sortedWordAndNums.saveAsTextFile(args[1]);
//关闭资源
context.stop();
}
}
3.使用Lambda表达式
尝试在本地运行,读写hadoop上的文件
在IDEA设置(传参数):
-
Configurations — Program arguments:
hdfs://linux01:8020/anli/wordCount/input hdfs://linux01:8020/anli/wordCount/output03
public class LambdaWordCount {
public static void main(String[] args) {
//在idea上执行,hadoop上的文件,并输出到hadoop中
//因为windows的权限不够,需要修改调用者名称
System.setProperty("HADOOP_USER_NAME","root");
//结果:权限不足,hadoop权限调整为777,可以运行;或者使用相同方式在scala代码上,可以运行
SparkConf conf = new SparkConf().setAppName("LambdaWordCount").setMaster("local[*]");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> lines = context.textFile(args[0]);
JavaRDD<String> words = lines.flatMap(e -> Arrays.stream(e.split(" ")).iterator());
JavaPairRDD<String, Integer> wordAndNum = words.mapToPair(e -> Tuple2.apply(e, 1));
JavaPairRDD<String, Integer> wordAndNums = wordAndNum.reduceByKey(Integer::sum);
JavaPairRDD<Integer, String> numsAndWord = wordAndNums.mapToPair(e -> Tuple2.apply(e._2, e._1));
JavaPairRDD<Integer, String> sortednumsAndWord = numsAndWord.sortByKey(false);
JavaPairRDD<String, Integer> sortedWordAndNums = sortednumsAndWord.mapToPair(e -> Tuple2.apply(e._2, e._1));
sortedWordAndNums.saveAsTextFile(args[1]);
context.stop();
}
}
四 RDD详解
RDD:Resilient distributed Dataset—有弹性的分布式的数据集合;里面没有真正的数据,是一个抽象的、不可变得、被分区的集合,集合内的元素可以被并行的操作(多个Task)。
五大特点:
- 有多个分区,分区数量决定任务并行数
- 函数作用在分区的迭代器中,决定了计算逻辑
- 众RDD之间存在依赖关系,可以根据依赖关系恢复失败的任务和划分stage
- 如果要发生shuffle,要使用分区器,默认使用HashPartitioner
- 最优位置,即将Executor调度到数据所在的节点上,要求Worker和Datanode部署在同一节点或OnYarn,通过访问Namenode获取数据块位置信息
深层理解RDD:
- RDD是一个抽象的数据集,保存的是元数据信息,和对应的运算逻辑
- 即RDD记录了:数据从哪里来、所属Task的分区、对数据作何处理
- 一个Task是由多个RDD相连组成的,而多个RDD之间是由各自的迭代器连接形成迭代器链将RDD的对应Task分区连接起来的
- RDD是由,多个各自分区的迭代器(迭代器中记录了数据来源),及其Task的Function(运算逻辑:compute方法),组成的一个大集合
补充知识点
classOf[T]
方法- 获取类型T的Class对象,返回在运行过程中的类型。
- 宽依赖与窄依赖:
- 创建连接
- 分区器类型相同,分区数量相同,不再shuffle,也就没有新的stage
- Partitioner分区器
- 查看Executor的打印台
- 在work目录—job文件夹的stdout中
1.分区
- RDD分区方式
- spark读取hdfs文件时,分区数算法详述
- 在获取RDD时,sc调用textFile等方法,输入path,并可选输入minPartitions
- minPartitions会进行判断,取输入值和2之间的最小值
- textFile类:调用hadoopFile,HadoopFile会返回一个KV类型的RDD(指针位置和行内容),textFile使用map方法抽取value,返回一个String类型的RDD
- 在hadoopFile中,调入了一个类:TextInputFormat,其父类FileInputFormat中,包含了RDD的分区方法:getSplits
- 遍历所有文件,获取总的文件长度totalSize,再用总长度除numSplits(前面的minPartitions,默认为2)(numSplits == 0 ? 1 : numSplits),获取目标大小goalSize
- 遍历所有文件,首先调用computeSplitSize方法,取goalSize和128M的最小值,返回为splitSize
- 进入for循环,判断文件的实际长度是否大于splitSize的1.1倍,如果大于,调用makeSplit进行切分,切掉的大小为splitSize
- 而后该文件剩余部分继续循环,直到小于splitSize的1.1倍为止
2.算子
算子是对RDD进行操作
对RDD中的每个分区的迭代器进行函数处理
2.1 Transformation(不产生shuffle)
2.1.1 map方法详解
- map方法传入自定义的函数返回一个RDD
- 在该方法中调用了一个构造方法,创建了一个RDD对象:MapPartitionsRDD
- MapPartitionsRDD继承RDD,利用多态,构造方法可以返回一个RDD对象
- 在构造方法中,传入了调用map方法的RDD,和一个函数:
(_, _, iter) => iter.map(cleanF)
- 函数中的cleanF是经过处理后的传入的function,iter是一个读取上一个RDD信息的迭代器
- 可以发现,在构造方法的函数中,对迭代器读取的信息,直接调用了map方法进行了处理,并返回了一个新的迭代器,与其他方法组合,成为一个RDD(MapPartitionsRDD)
2.1.2 其他算子(不shuffle)
- mapPartitionsWithIndex
- mapPartitions
- flatMapValues
- union(++)
- 只能相同类型RDD进行union
- union不会去重,没有shuffle
- 会把所有的元素汇总为新的RDD
- 相当于把多个RDD的多个分区sum到一个新的RDD中
- filter
- 过滤,也是一条一条的过滤,返回true的就保留
- flatMap
- 相当于先map在flatten,rdd中没有flatten方法
- keys
- values
- 同上,返回第二个
- mapValues
2.2 Transformation(产生shuffle)
在分布式计算中,将数据按照一定的计算逻辑(分区器)
2.2.1 reduceByKey方法详解
-
调用了combineByKeyWithclasstag方法,传入4个参数:一个reduceByKey的默认函数,二和三都是传入的自定义函数,四是一个分区器
-
在方法中,调用的构造函数是:ShuffledRDD(绝大部分带shuffle的Transformation都是该RDD)
-
构造函数传入调用reduceByKey的RDD,和一个分区器,该构造方法构造出来的RDD只会执行一个shuffle,不会执行shuffle前后的聚合等方法
-
需要通过set:Serializer(序列化方法)、Aggregator(传入上面那3个函数)、MapSideCombine(shuffle前是否聚合)等信息对ShuffledRDD进行完善
-
可以自行创建ShuffledRDD,来替代reduceByKey方法:
//手动编写reduceByKey方法 //函数1:聚合时,首个元素传入时对value的处理方法 val f1 = (e:Int) => e //函数2:聚合value的方法 val f2 = (e1:Int ,e2: Int) => e1+ e2 //在reduceByKey中,shuffle前后的聚合方法是一样的 val f3 = f2 //三个泛型:Key类型,原Value类型,聚合后的Value类型 //分区器创建一个新的HashPartitioner,分区数量设置为源RDD的分区数量 val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](wordAndOne, new HashPartitioner(wordAndOne.partitions.length)) //创建Aggregator,传入函数 var agg = new Aggregator[String, Int, Int](f1,f2,f3) shuffledRDD.setMapSideCombine(true) shuffledRDD.setAggregator(agg)
2.2.2 cogroup
- 协分组、联合分组,是对多个RDD进行分组
- RDD必须是keyvalue类型的,并且两个RDD的key类型一样才可以进行cogroup
- 生成的RDD中,数据格式为最外层是KV类型的元组,value是由两个迭代器组成的元组构成的
- cogroup是多个算子的实际执行方法,如join、leftOuterJoin、RightOuterJoin等
- 其中,创建的RDD是一个独特的RDD:CoGroupedRDD
2.2.3 其他算子(带shuffle)
-
groupBy(分组原则)
-
groupByKey
-
reudceByKey(函数)
-
distinct
-
- 与reduceByKey相同,更为灵活
- 可以将shuffle前后的聚合方法分别编写
- 局部聚合应用初始值,全局聚合不用初始值
-
foldByKey(初始值)(函数)
- 可以指定初始值的reduceByKey
-
partitionBy(分区器)
- 只分区,不聚合、不分组
- 与new shuffledRDD什么也不设置一样
-
join(RDD)
-
手写实现leftOuterJoin(rightOuterJoin同理)
val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11.cogroup(rdd12) val out = rdd2.flatMapValues(pair=> if (pair._2.isEmpty) pair._1.map((_,None)) else for ( x<- pair._1 ; y <- pair._2 ) yield (x, Some(y))) //else pair._1.flatMap(e=> pair._2.map(y=> (e,Some(y)))))
-
手写实现fullOuterJoin
val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11.cogroup(rdd12) val out = rdd2.flatMapValues { case (Seq(), iter) => iter.map(e => (None, Some(e))) case (iter, Seq()) => iter.map(e => (Some(e), None)) case (it1, it2) => it1.flatMap(x => it2.map(y => (Some(x), Some(y)))) }
-
subtract(RDD)
- 求差集,rdd1-rdd2
- 调用了SubtractedRDD
-
repartition(分区数)
- 重新分区(还是HashPartitioner)
- 使用场景:提高并行度、更改分区器
-
coalesce(分区数,是否shuffle)
-
sortBy(排序关键字,是否正序)
- 先分桶排序,再全局聚合
- 会进行采样(sample),collect结果,构建分区规则,划分桶的范围,将数据shuffle到不同范围的桶中
- 对各个桶内进行排序,按照桶的分区号(序号)
-
intersection 交集
-
subtract 差集
2.3 action算子
所有的action算子都会调用:sc.runJob
调用sc.runJob方法,根据最后一个RDD从后往前推,触发Action就会生成DAG,切分Stage,生成TaskSet
算子介绍:
- collect
- 收集数据到一个数组中
- foreach
- 遍历每一条数据
- foreachPartition
- 与foreach相似,处理分区
- foreachPartitionAsync
- 与foreachPartition相似,异步进行
- count
- 先局部计数,再在Driver进行全局计数
- saveAsTextFile
- 保存为Text类型文件
- sum
- fold(初始值)(函数)
- 带初始值执行函数
- reduce(函数)
- 不带初始值的聚合
- take(数量)
- first
- top(n)
- min/max
- 最大值,最小值
- takeOrdered(获取数量)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。