微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

大数据篇--Spark调优

文章目录

一、算子的合理选择

pom.xml内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huiq</groupId>
    <artifactId>HuiqTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

1.map和mappartition:

  编写Scala程序模拟使用不同的算子将数据插入到数据中比较不同。

MapMappartitionApp:

package com.huiq.test

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

object MapMappartitionApp {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("MapMappartitionApp")

    val sc = new SparkContext(sparkConf)

    val students = new ListBuffer[String]()
    for (i<-1 to 100) {
      students += "stu: " + i
    }
    val stuRdd = sc.parallelize(students)

    // 需要把students存储到数据库中去
    myMap(stuRdd)

    sc.stop()
  }

  def myMap(rdd: RDD[String]): Unit = {
    rdd.map(x => {
      val connection = dbutils.getConnection()
      println(connection + "------------>")

      // Todo... 保存数据到数据库dbutils.returnConnection(connection)
    }).foreach(println)
  }
}

  运行程序我们会发现使用map算子有多少个元素就会创建多少个connection,这种性能肯定是不行的。

在这里插入图片描述

  换成mapPartition再运行程序:

在这里插入图片描述

总结map是对RDD中的每个元素作用上一个函数(你假设有个rdd有100个分区,每个分区里有1万个元素,你算一下整个过程会开启多少个connection?)。mapPartition是将函数作用到partition之上的(如果遇到要写数据到数据库,一定要选择该模式)。

思考如果分区数量比较少导致一个分区中的数据量很大,这种场景下用mapPartition可能会有资源不够导致类似OOM的问题,遇到这种问题的时候可以手动调整partition的数量解决,比如上面的代码可以设置成10个分区val stuRdd = sc.parallelize(students, 10)

2.foreach和foreachpartition:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐