微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark入门

一 spark简介

Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache的顶级项目,2014年5月发布spark1.0,2016年7月发布spark2.0,2020年6月18日发布spark3.0.0

1.spark特点

  • Ease of Use:简洁易用
    • Spark支持 Java、Scala、Python和R等编程语言编写应用程序,大大降低了使用者的门槛。自带了80多个高等级操作算子,并且允许在Scala,Python,R 的使用命令进行交互式运行,可以非常方便的在Spark Shell中地编写spark程序。
  • Generality:通用、全栈式数据处理
    • Spark提供了统一的大数据处理解决方案,非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。 同时Spark还支持sql,大大降低了大数据开发者的使用门槛,同时提供了SparkStream和Structed Streaming可以处理实时流数据;MLlib机器学习库,提供机器学习相关的统计分类、回归等领域的多种算法实现。其高度封装的API 接口大大降低了用户的学习成本;Spark GraghX提供分布式图计算处理能力;PySpark支持Python编写Spark程序;SparkR支持R语言编写Spark程序。
  • Runs Everywhere:可以运行在各种资源调度框架和读写多种数据源
    • Spark支持的多种部署方案:Standalone是Spark自带的资源调度模式;Spark可以运行在Hadoop的YARN上面;Spark 可以运行在Mesos上(Mesos是一个类似于YARN的资源调度框架);Spark还可以Kubernetes实现容器化的资源调度
  • 丰富的数据源支持。Spark除了可以访问操作系统自身的本地文件系统和HDFS之外,还可以访问 Cassandra、HBase、Hive、Alluxio(Tachyon)以及任何 Hadoop兼容的数据源。这极大地方便了已经 的大数据系统进行顺利迁移到Spark。

2.Spark与MapReduce的对比

MapReduce和Spark的本质区别

  • MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。
  • 既可以做离线计算,有可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用。
  • 注意:MR和Spark在Shuffle时数据都落本地磁盘

3.spark架构体系

  • 三种模式

    • standalone client模式
    • standalone cluster模式
    • Spark On YARN cluster模式
  • Spark执行流程简介

    • Job:RDD每一个行动操作都会生成一个或者多个调度阶段 调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage对应一个TaskSet,一个Task中包含多Task,TaskSet的数量与该阶段最后一个RDD的分区数相同。

    • Task:分发到Executor上的工作任务,是Spark的最小执行单元

    • DAGScheduler:DAGScheduler是将DAG根据宽依赖将切分Stage,负责划分调度阶段并Stage转成TaskSet提交给TaskScheduler

    • TaskScheduler:TaskScheduler是将Task调度到Worker下的Exexcutor进程,然后丢入到Executor的线程池的中进行执行

      image-20201221172758045

  • Spark中重要角色

    • Master :是一个Java进程,接收Worker的注册信息和心跳、移除异常超时的Worker、接收客户端提交的任务、负责资源调度、命令Worker启动Executor。
    • Worker :是一个Java进程,负责管理当前节点的资源管理,向Master注册并定期发送心跳,负责启动Executor、并监控Executor的状态。
    • SparkSubmit :是一个Java进程,负责向Master提交任务。
    • Driver :是很多类的统称,可以认为SparkContext就是Driver,client模式Driver运行在SparkSubmit进程中,cluster模式单独运行在一个进程中,负责将用户编写的代码转成Tasks,然后调度到Executor中执行,并监控Task的状态和执行进度。
    • Executor :是一个Java进程,负责执行Driver端生成的Task,将Task放入线程中运行。
  • Spark和Yarn角色对比(Spark StandAlone的Client模式对比YARN)

    • Master => ResourceManager
    • Worker => NodeManager
    • Executor => YarnChild
    • SparkSubmit(Driver) => ApplicationMaster

二 集群搭建及启动

1.下载与安装

  1. 下载spark,通过国内的镜像网站进行下载(如清华镜像站等)

  2. 传输到集群中的一台并解压

  3. 修改conf下的spark-env.sh文件

    export JAVA_HOME=/opt/apps/jdk1.8.0_141
    export SPARK_MASTER_HOST=linux01

  4. 修改conf下的slaves,加入worker的设备

2.spark-shell启动

/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077 --executor-memory 2g --total-executor-cores 4

三 spark编程入门

1. scala入门程序

使用Scala编写Spark的WorkCount

  1. 创建一个maven项目

  2. 导入依赖等信息

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn._51doit</groupId>
        <artifactId>spark-in-action</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <!-- 定义了一些常量 -->
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.12.12</scala.version>
            <spark.version>3.0.1</spark.version>
            <encoding>UTF-8</encoding>
        </properties>
    
        <dependencies>
            <!-- 导入scala的依赖 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!-- 编译scala的插件 -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <!-- 编译java的插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.5.1</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>meta-inf/*.SF</exclude>
                                            <exclude>meta-inf/*.DSA</exclude>
                                            <exclude>meta-inf/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    
    
    
    </project>
    
  3. 编写main方法

    package cn.doit.day01.demo01
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
      def main(args: Array[String]): Unit = {
    
        //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
        val conf = new SparkConf().setAppName("WordCount")
    
        val sc = new SparkContext(conf)
    
        //1 创建RDD:指定[以后]从HDFS中读取数据
        val lines: RDD[String] = sc.textFile(args(0))
    
        //2 对数据进行切分压平
        val words = lines.flatMap(_.split(" "))
    
        //3 将单词和一组合
        val wordAndOne = words.map((_, 1))
    
        //4 单词聚合,先局部聚合,再全局聚合,使用reduceByKey
        val wordAndNums = wordAndOne.reduceByKey(_ + _)
    
        //5 输出结果并保存
        wordAndNums.saveAsTextFile(args(1))
      }
    }
    
  4. 打包

    可以使用provided 指定依赖是否打入包中

    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    

    使用 指定main方法

    <!-- 指定maven(main)方法 -->
    <transformers>
    	<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    		<mainClass>cn.doit.day01.demo01.WordCount</mainClass>
    	</transformer>
    </transformers>
    
  5. jar包导入集群中执行

    • 如果不指定main方法,需要指定class,指定类名

      –class cn.doit.day01.demo01.WordCount

    ./bin/spark-submit --master spark://linux01:7077 【--class cn.doit.day01.demo01.WordCount】 /root/spark_learn.jar hdfs://linux01:8020/anli/wordCount/input hdfs://linux01:8020/anli/wordCount/output02
    

2.使用java编写

java编写,并在本地运行

代码如下:

public class WordCount {
    public static void main(String[] args) {

        //创建连接,设置进程名()
        SparkConf conf = new SparkConf().setAppName("JavaWordCount");

        //如果在本地运行,设置Master所调用的线程资源数,一般使用local[*],调用全部资源(不能设置为1)
        conf.setMaster("local[*]");

        //javaSparkContext是对SparkContext的包装类
        JavaSparkContext context = new JavaSparkContext(conf);

        //设置文件输入路径
        JavaRDD<String> rdd = context.textFile(args[0]);

        //切分压平
        JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });

        //单词和一结合
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return Tuple2.apply(s, 1);
            }
        });

        //聚合
        JavaPairRDD<String, Integer> wordAndNums = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        //交换顺序
        JavaPairRDD<Integer, String> numsAndWord = wordAndNums.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return Tuple2.apply(stringIntegerTuple2._2, stringIntegerTuple2._1);
            }
        });

        //排序
        JavaPairRDD<Integer, String> sortednumsAndWord = numsAndWord.sortByKey(false);

        //再交换顺序
        JavaPairRDD<String, Integer> sortedWordAndNums = sortednumsAndWord.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return Tuple2.apply(integerStringTuple2._2, integerStringTuple2._1);
            }
        });

        //保存
        sortedWordAndNums.saveAsTextFile(args[1]);

        //关闭资源
        context.stop();
    }
}

3.使用Lambda表达式

尝试在本地运行,读写hadoop上的文件

在IDEA设置(传参数):

  • Configurations — Program arguments:

    hdfs://linux01:8020/anli/wordCount/input hdfs://linux01:8020/anli/wordCount/output03

public class LambdaWordCount {
    public static void main(String[] args) {
        
        //在idea上执行,hadoop上的文件,并输出到hadoop中
        //因为windows的权限不够,需要修改调用名称
        System.setProperty("HADOOP_USER_NAME","root");
        //结果:权限不足,hadoop权限调整为777,可以运行;或者使用相同方式在scala代码上,可以运行

        SparkConf conf = new SparkConf().setAppName("LambdaWordCount").setMaster("local[*]");

        JavaSparkContext context = new JavaSparkContext(conf);

        JavaRDD<String> lines = context.textFile(args[0]);

        JavaRDD<String> words = lines.flatMap(e -> Arrays.stream(e.split(" ")).iterator());

        JavaPairRDD<String, Integer> wordAndNum = words.mapToPair(e -> Tuple2.apply(e, 1));

        JavaPairRDD<String, Integer> wordAndNums = wordAndNum.reduceByKey(Integer::sum);

        JavaPairRDD<Integer, String> numsAndWord = wordAndNums.mapToPair(e -> Tuple2.apply(e._2, e._1));

        JavaPairRDD<Integer, String> sortednumsAndWord = numsAndWord.sortByKey(false);

        JavaPairRDD<String, Integer> sortedWordAndNums = sortednumsAndWord.mapToPair(e -> Tuple2.apply(e._2, e._1));

        sortedWordAndNums.saveAsTextFile(args[1]);

        context.stop();
    }
}

四 RDD详解

RDD:Resilient distributed Dataset—有弹性的分布式的数据集合;里面没有真正的数据,是一个抽象的、不可变得、被分区的集合,集合内的元素可以被并行的操作(多个Task)。

五大特点:

  • 有多个分区,分区数量决定任务并行数
  • 函数作用在分区的迭代器中,决定了计算逻辑
  • 众RDD之间存在依赖关系,可以根据依赖关系恢复失败的任务和划分stage
  • 如果要发生shuffle,要使用分区器,认使用HashPartitioner
  • 最优位置,即将Executor调度到数据所在的节点上,要求Worker和Datanode部署在同一节点或OnYarn,通过访问Namenode获取数据块位置信息

深层理解RDD:

  • RDD是一个抽象的数据集,保存的是元数据信息,和对应的运算逻辑
  • 即RDD记录了:数据从哪里来、所属Task的分区、对数据作何处理
  • 一个Task是由多个RDD相连组成的,而多个RDD之间是由各自的迭代器连接形成迭代器链将RDD的对应Task分区连接起来的
  • RDD是由,多个各自分区的迭代器(迭代器中记录了数据来源),及其Task的Function(运算逻辑:compute方法),组成的一个大集合

补充知识点

  • classOf[T] 方法
    • 获取类型T的Class对象,返回在运行过程中的类型。
  • 宽依赖与窄依赖:
    • 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD一般对应父RDD的一个或者多个分区。(与数据规模无关)不会产生shuffle。
    • 宽依赖指父RDD的多个分区可能被子RDD的一个分区所使用,子RDD分区通常对应所有的父RDD分区(与数据规模有关),会产生shuffle
  • 创建连接
    • 在创建对象或连接时,不要在Driver中创建,可能会导致Task无法序列化
    • 关闭资源时,可以判断迭代器没有下一条数据时,进行关闭
  • 分区器类型相同,分区数量相同,不再shuffle,也就没有新的stage
  • Partitioner分区器
  • 查看Executor的打印台
    • 在work目录—job文件夹的stdout中

1.分区

  • RDD分区方式
    • 直接设置最小分区数,通过调用parallelize方法(仅限scala数组使用)
    • 通过设置textFile方法中的minpartitions参数,调节分区数
  • spark读取hdfs文件时,分区数算法详述
    1. 获取RDD时,sc调用textFile等方法,输入path,并可选输入minPartitions
    2. minPartitions会进行判断,取输入值和2之间的最小值
    3. textFile类:调用hadoopFile,HadoopFile会返回一个KV类型的RDD(指针位置和行内容),textFile使用map方法抽取value,返回一个String类型的RDD
    4. 在hadoopFile中,调入了一个类:TextInputFormat,其父类FileInputFormat中,包含了RDD的分区方法:getSplits
    5. 遍历所有文件获取总的文件长度totalSize,再用总长度除numSplits(前面的minPartitions,认为2)(numSplits == 0 ? 1 : numSplits),获取目标大小goalSize
    6. 遍历所有文件,首先调用computeSplitSize方法,取goalSize和128M的最小值,返回为splitSize
    7. 进入for循环,判断文件的实际长度是否大于splitSize的1.1倍,如果大于,调用makeSplit进行切分,切掉的大小为splitSize
    8. 而后该文件剩余部分继续循环,直到小于splitSize的1.1倍为止

2.算子

算子是对RDD进行操作

对RDD中的每个分区的迭代器进行函数处理

2.1 Transformation(不产生shuffle)

2.1.1 map方法详解
2.1.2 其他算子(不shuffle)
  • mapPartitionsWithIndex
    • 对Partition进行map操作,与此同时可以将分区编号获取
    • 输入的是迭代器,返回的数据也是迭代器
    • 函数输入值:(index,iter)
  • mapPartitions
  • flatMapValues
    • 当RDD时KV类型,且value是一个数组或集合时
    • 将集合炸开取每个元素,与key结合成新的元组
  • union(++)
    • 只能相同类型RDD进行union
    • union不会去重,没有shuffle
    • 会把所有的元素汇总为新的RDD
    • 相当于把多个RDD的多个分区sum到一个新的RDD中
  • filter
    • 过滤,也是一条一条的过滤,返回true的就保留
  • flatMap
    • 相当于先map在flatten,rdd中没有flatten方法
  • keys
  • values
    • 同上,返回第二个
  • mapValues
    • 是对RDD中对偶元组的value进行处理,不用管key,将value应用的传人的函数处理后在跟key放到对偶元组

2.2 Transformation(产生shuffle)

在分布式计算中,将数据按照一定的计算逻辑(分区器)

2.2.1 reduceByKey方法详解
  • 调用了combineByKeyWithclasstag方法,传入4个参数:一个reduceByKey的函数,二和三都是传入的自定义函数,四是一个分区器

  • 方法中,调用的构造函数是:ShuffledRDD(绝大部分带shuffle的Transformation都是该RDD)

  • 构造函数传入调用reduceByKey的RDD,和一个分区器,该构造方法构造出来的RDD只会执行一个shuffle,不会执行shuffle前后的聚合等方法

  • 需要通过set:Serializer(序列化方法)、Aggregator(传入上面那3个函数)、MapSideCombine(shuffle前是否聚合)等信息对ShuffledRDD进行完善

  • 可以自行创建ShuffledRDD,来替代reduceByKey方法

    //手动编写reduceByKey方法
    
    //函数1:聚合时,首个元素传入时对value的处理方法
    val f1 = (e:Int) => e
    //函数2:聚合value的方法
    val f2 = (e1:Int ,e2: Int) => e1+ e2
    //在reduceByKey中,shuffle前后的聚合方法是一样的
    val f3 = f2
    
    //三个泛型:Key类型,原Value类型,聚合后的Value类型
    //分区器创建一个新的HashPartitioner,分区数量设置为源RDD的分区数量
    val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](wordAndOne, new HashPartitioner(wordAndOne.partitions.length))
    
    //创建Aggregator,传入函数
    var agg = new Aggregator[String, Int, Int](f1,f2,f3)
    
    shuffledRDD.setMapSideCombine(true)
    shuffledRDD.setAggregator(agg)
    
2.2.2 cogroup
  • 协分组、联合分组,是对多个RDD进行分组
  • RDD必须是keyvalue类型的,并且两个RDD的key类型一样才可以进行cogroup
  • 生成的RDD中,数据格式为最外层是KV类型的元组,value是由两个迭代器组成的元组构成的
  • cogroup是多个算子的实际执行方法,如join、leftOuterJoin、RightOuterJoin等
  • 其中,创建的RDD是一个独特的RDD:CoGroupedRDD
2.2.3 其他算子(带shuffle)
  • groupBy(分组原则)

  • groupByKey

    • 底层调用的是combineByKeyWithclasstag,在该方法中new的ShuffleRDD,mapSideCombine = false
  • reudceByKey(函数

    • 可以先局部聚合,在全局聚合,底层调用的是combineByKeyWithclasstag,并在方法中中new的ShuffleRDD, ,mapSideCombine = true
  • distinct

    • 去重,先调用map将数据变成K,V类型,Value是null,然后调用reduceByKey,先在每个分区进行局部去重,然后在全局去重
  • combinByKey(函数1,函数2,函数3)

    • 将reduceByKey分解,可以单独设置:初始化函数、聚合前的value合并函数、聚合的合并函数
    • 底层调用的是combineByKeyWithclasstag, 并在方法中中new的ShuffleRDD
  • aggregateByKey(初始值)(函数1,函数2)

    • 与reduceByKey相同,更为灵活
    • 可以将shuffle前后的聚合方法分别编写
    • 局部聚合应用初始值,全局聚合不用初始值
  • foldByKey(初始值)(函数

    • 可以指定初始值的reduceByKey
  • partitionBy(分区器)

    • 只分区,不聚合、不分组
    • 与new shuffledRDD什么也不设置一样
  • join(RDD)

    • 类似于sql中的inner

    • 使用cogroup实现join

      val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11.cogroup(rdd12)
      val out = rdd2.flatMapValues(e => for (x <- e._1; y <- e._2) yield (x, y))
      
  • 手写实现leftOuterJoin(rightOuterJoin同理)

    val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11.cogroup(rdd12)
    val out = rdd2.flatMapValues(pair=>
        if (pair._2.isEmpty) pair._1.map((_,None))
        else for ( x<- pair._1 ; y <- pair._2 ) yield (x, Some(y)))
        //else pair._1.flatMap(e=> pair._2.map(y=> (e,Some(y)))))
    
  • 手写实现fullOuterJoin

    val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11.cogroup(rdd12)
    val out = rdd2.flatMapValues {
        case (Seq(), iter) => iter.map(e => (None, Some(e)))
        case (iter, Seq()) => iter.map(e => (Some(e), None))
        case (it1, it2) => it1.flatMap(x => it2.map(y => (Some(x), Some(y))))
    }
    
  • subtract(RDD)

    • 求差集,rdd1-rdd2
    • 调用了SubtractedRDD
  • repartition(分区数)

    • 重新分区(还是HashPartitioner)
    • 使用场景:提高并行度、更改分区器
  • coalesce(分区数,是否shuffle)

    • 也是重新分区,可以设置是否shuffle
    • 如果不shuffle,分区数只能减少,不能增加(因为如果分区增加,一定要一个分区的数据发送到多个分区,必须要shuffle)
    • repartition底层调用的就是coalesce,shuffle为true
  • sortBy(排序关键字,是否正序)

    • 先分桶排序,再全局聚合
    • 会进行采样(sample),collect结果,构建分区规则,划分桶的范围,将数据shuffle到不同范围的桶中
    • 对各个桶内进行排序,按照桶的分区号(序号)
  • intersection 交集

    • 底层调用的cogroup,map将数据本身当成key,null当成value,然后进行过滤,过滤的条数时间,两个迭代器都不为空迭代器,然后调用keys取出key
  • subtract 差集

2.3 action算子

所有的action算子都会调用sc.runJob

调用sc.runJob方法,根据最后一个RDD从后往前推,触发Action就会生成DAG,切分Stage,生成TaskSet

算子介绍:

  • collect
    • 收集数据到一个数组中
  • foreach
    • 遍历每一条数据
  • foreachPartition
    • 与foreach相似,处理分区
  • foreachPartitionAsync
    • 与foreachPartition相似,异步进行
  • count
    • 先局部计数,再在Driver进行全局计数
  • saveAsTextFile
  • sum
    • 底层调用了fold(0)(函数
    • 先局部求和,再全局求和
  • fold(初始值)(函数
  • reduce(函数
    • 不带初始值的聚合
  • take(数量
    • 取n条数据,可能触发多次Action
    • 每一次触发Action取一个分区的数据,不够n个再到下一个分区取
  • first
    • 返回第一个元素,调用的是take(1)
    • 不同之处:take返回数组,first返回元素
  • top(n)
    • 取前n个,返回数组
    • 在每个分区内使用有界优先队列对N个数据进行排序,移除不符合的数据
    • 再在Driver端使用有界优先队列进行全局排序
    • 有界优先队列:
      • 有界优先队列类似一个TreeSet,但是不去重;
      • 如果取n个数据,则会创建一个长度为n的有界优先队列
      • 利用红黑树的高效排序,每存储一个数据,集合内的数据达到n+1条时,对集合内最小(最大)的数据进行remove
      • 使集合内始终保留最大(最小)的n条数据,这样就可以求出每个分区的前n条
      • 两个长度为n的队列结合(++=),同样会保留n条数据
    • 只在Driver端生成进行Map的操作,不需要shuffle
  • min/max
    • 最大值,最小值
  • takeOrdered(获取数量

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐