微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

大数据实战十七课上- Spark-Core05

一、上次课回顾

二、Map和MapPartition

三、sc.textFile源码剖析

一、上次课回顾

大数据实战十六课(下)- Spark-Core04
https://blog.csdn.net/zhikanjiani/article/details/99731015

二、MapPartition

1、Map在RDD.scala中的定义:

  Return a new RDD by applying a function to all elements of this RDD.
//返回的是一个RDD,RDD中的每一个元素都作用上一个函数

  def map[U: classtag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

2、MapPartition在RDD.scala中的定义:


   Return a new RDD by applying a function to each partition of this RDD.
//返回一个新的RDD,作用到RDD上的每一个Partition上

 `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
  
  def mapPartitions[U: classtag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }
  • mapPartition:作用到分区 map:作用到记录
    RDD <-- N个partition构成 <-- N个Record@H_502_11@

举例:

  1. RDD中 有10个分区 每个分区100W条数据
    rdd save MysqL
    使用map:connection 1000W条记录
    使用mapPartition:10个分区@H_502_11@
package Sparkcore04

import java.util.Random

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

object mapPartition {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setAppName("MapPartitionApp").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)


    val stus = new ListBuffer[String]
    for (i <- 1 to 100)
    {
      stus += "G60" + i
    }

    val rdd = sc.parallelize(stus)
    rdd.map( x =>{
      val conn = DB.getConn()

      println(conn + "~~~")
      DB.returnConn(conn)

    }).collect

    sc.stop()
  }

}


      object DB{
        def getConn() = {
          new Random().nextInt(10)+""
        }

        def returnConn(conn:String)={

        }
      }

使用mapPartition:

 val rdd = sc.parallelize(stus)
    println("一共有几个分区:" + rdd.partitions.length)
    rdd.mapPartitions( partition =>{
      val conn = DB.getConn()
      println(conn + "~~~~")
      DB.returnConn(conn +"~~~")
      partition
    }).collect

    sc.stop()

输出:
一共有几个分区:2
0~~~~
5~~~~

对于批处理:要有一批一批的概念,在代码中用事物控制
Java中使用aop控制,在大数据中,先删除再插入不是更简单么。

  • 一个sql,点一个按钮提交,把结果返回回来,你知道作业要跑几个小时么?@H_502_11@

WC flatMap map

2.1 foreachPartition

foreachPartition在RDD.scala中的定义:

    Applies a function f to each partition of this RDD.
   //作用上一个函数到RDD中的每一个分区上去
   
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

  1. 只要是存储到外部数据库(Hbase、MysqL)中,首选算子:foreachPartition@H_502_11@
  2. map和mapPartition是转换Transformation的,使用collect手动触发@H_502_11@

3 sc.textFile源码剖析

sc.textFile(" ") //把文件文件夹转换成一个RDD

底层源码做了什么事情:

Read a text file from HDFS, a local file system (available on all nodes), or any
 Hadoop-supported file system URI, and return it as an RDD of Strings.
@param path path to the text file on a supported file system
@param minPartitions suggested minimum number of partitions for the resulting RDD
@return RDD of lines of the text file
// 去hdfs上读一个文本文件一个本地文件系统(standalone模式下能够在任何节点被访问到),或者任何hadoop能够支持文件系统:Hbase、S3,当做RDD的一个String给他返回

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

重点:

  1. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],@H_502_11@
  1. minPartitions).map(pair => pair._2.toString).setName(path)@H_502_11@
  • 关键:map(pair => pair._2.toString) ==> 只获取value,不要偏移量。@H_502_11@

textFile调用的是hadoopFile

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
   *
   * @note Because Hadoop's RecordReader class re-uses the same Writable object for each
   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
   * operation will create many references to the same object.
   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
   * copy them using a `map` function.
   * @param path directory to the input data files, the path can be comma separated paths
   * as a list of inputs
   * @param inputFormatClass storage format of the data to be read
   * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
   * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
   * @param minPartitions suggested minimum number of partitions for the resulting RDD
   * @return RDD of tuples of key and corresponding value
   */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confbroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confbroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

面试题:Mapper<>、Reducer<> 中有4个参数

  • public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable
    对应的map、reduce方法中是3个参数@H_502_11@

3.1 了解Spark-shell的启动流程

  1. cd $SPARK_HOME/bin more spark-shell@H_502_11@
  • cygwin = false 有人想在windows环境部署学习大数据,检查系统环境@H_502_11@
  1. case “$(uname)” in
    CYGWIN*) cygwin=true;@H_502_11@
  • uname 检查系统信息 uname -r 显示操作系统的发行版号 uname -a 打印所有信息@H_502_11@

在这里插入图片描述

  1. if [ -z "SPARKHOME&quot;];then//{SPARK_HOME}&quot; ]; then //查看SPARKH​OME"];then//查看SPARK_HOME是否为0
    source “$(dirname “$0”)” /find-spark-home //当前目录下肯定有find-spark-home这个目录
    fi@H_502_11@
  • 判断是否有SPARK_HOME,没有的话走下一句话,@H_502_11@

shell中进行测试:

1、vi test.sh
teacher="ruoze"
if [ -z "${teacher}" ]; then
echo "jepson"
else {
	echo ${teacher}
}
fi 

2、chmod +x test.sh

3、./test.sh		此时输出是ruoze
//这段代码的意思是:
设置老师是若泽,判断老师这个参数是否为空,空的话输出jepson;非空的话,输出若泽
#teacher的话,把teacher注释掉的话,输出就是jepson

source “$(dirname “$0”)” /find-spark-home的含义:

1、vi test.sh
home=`cd $(dirname "$0");pwd`
echo ${home}

2、chmod +x test.sh

3、./test.sh
执行当前脚本所在目录
  1. function main(){

    }
    main “$@”@H_502_11@

测试:main “$@”

1、function main() {
	echo "input params is:"$@
}
main "$@"

2、./test.sh xx yy zz

3、输出:input params is: xx yy zz

  • else中执行的语句:
    export SPARK_SUBMIT_OPTS
    “KaTeX parse error: Expected '}', got 'EOF' at end of input: …"Spark shell" "@”@H_502_11@

小结:spark-shell底层调用的是spark-submit,spark-shell运行的时候名字就叫这个,所有的参数跟在spark-shell后面,比如–master等等
此时走到spark-submit。

  1. exec “SAPRKHOME&quot;/bin/sparkclassorg.apaceh.SparkSubmit&quot;{SAPRK_HOME}&quot; /bin/spark-class org.apaceh.SparkSubmit &quot;SAPRKH​OME"/bin/spark−classorg.apaceh.SparkSubmit"@”@H_502_11@
  • exec表示执行@H_502_11@

cd sbin
cat start-all.sh

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐