- 2.1 foreachPartition@H_502_11@
- 3.1 了解spark-shell启动流程@H_502_11@
一、上次课回顾
大数据实战十六课(下)- Spark-Core04
https://blog.csdn.net/zhikanjiani/article/details/99731015
二、MapPartition
1、Map在RDD.scala中的定义:
Return a new RDD by applying a function to all elements of this RDD.
//返回的是一个RDD,RDD中的每一个元素都作用上一个函数
def map[U: classtag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
2、MapPartition在RDD.scala中的定义:
Return a new RDD by applying a function to each partition of this RDD.
//返回一个新的RDD,作用到RDD上的每一个Partition上
`preservesPartitioning` indicates whether the input function preserves the partitioner, which
should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
def mapPartitions[U: classtag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
- mapPartition:作用到分区 map:作用到记录
RDD <-- N个partition构成 <-- N个Record@H_502_11@
举例:
package Sparkcore04
import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object mapPartition {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("MapPartitionApp").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val stus = new ListBuffer[String]
for (i <- 1 to 100)
{
stus += "G60" + i
}
val rdd = sc.parallelize(stus)
rdd.map( x =>{
val conn = DB.getConn()
println(conn + "~~~")
DB.returnConn(conn)
}).collect
sc.stop()
}
}
object DB{
def getConn() = {
new Random().nextInt(10)+""
}
def returnConn(conn:String)={
}
}
使用mapPartition:
val rdd = sc.parallelize(stus)
println("一共有几个分区:" + rdd.partitions.length)
rdd.mapPartitions( partition =>{
val conn = DB.getConn()
println(conn + "~~~~")
DB.returnConn(conn +"~~~")
partition
}).collect
sc.stop()
输出:
一共有几个分区:2
0~~~~
5~~~~
对于批处理:要有一批一批的概念,在代码中用事物控制
Java中使用aop控制,在大数据中,先删除再插入不是更简单么。
WC flatMap map
2.1 foreachPartition
foreachPartition在RDD.scala中的定义:
Applies a function f to each partition of this RDD.
//作用上一个函数到RDD中的每一个分区上去
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
- 只要是存储到外部数据库(Hbase、MysqL)中,首选算子:foreachPartition@H_502_11@
- map和mapPartition是转换Transformation的,使用collect手动触发@H_502_11@
3 sc.textFile源码剖析
sc.textFile(" ") //把文件或文件夹转换成一个RDD
底层源码做了什么事情:
Read a text file from HDFS, a local file system (available on all nodes), or any
Hadoop-supported file system URI, and return it as an RDD of Strings.
@param path path to the text file on a supported file system
@param minPartitions suggested minimum number of partitions for the resulting RDD
@return RDD of lines of the text file
// 去hdfs上读一个文本文件,一个本地文件系统(standalone模式下能够在任何节点被访问到),或者任何hadoop能够支持的文件系统:Hbase、S3,当做RDD的一个String给他返回
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
重点:
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],@H_502_11@
- minPartitions).map(pair => pair._2.toString).setName(path)@H_502_11@
textFile调用的是hadoopFile
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* @param path directory to the input data files, the path can be comma separated paths
* as a list of inputs
* @param inputFormatClass storage format of the data to be read
* @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
* @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confbroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confbroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
面试题:Mapper<>、Reducer<> 中有4个参数
- public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable
对应的map、reduce方法中是3个参数@H_502_11@
3.1 了解Spark-shell的启动流程
- cd $SPARK_HOME/bin more spark-shell@H_502_11@
- cygwin = false 有人想在windows环境部署学习大数据,检查系统环境@H_502_11@
- case “$(uname)” in
CYGWIN*) cygwin=true;@H_502_11@
- if [ -z "SPARKHOME"];then//查看SPARK_HOME是否为0
source “$(dirname “$0”)” /find-spark-home //当前目录下肯定有find-spark-home这个目录
fi@H_502_11@
- 判断是否有SPARK_HOME,没有的话走下一句话,@H_502_11@
shell中进行测试:
1、vi test.sh
teacher="ruoze"
if [ -z "${teacher}" ]; then
echo "jepson"
else {
echo ${teacher}
}
fi
2、chmod +x test.sh
3、./test.sh 此时输出是ruoze
//这段代码的意思是:
设置老师是若泽,判断老师这个参数是否为空,空的话输出jepson;非空的话,输出若泽
#teacher的话,把teacher注释掉的话,输出就是jepson
source “$(dirname “$0”)” /find-spark-home的含义:
1、vi test.sh
home=`cd $(dirname "$0");pwd`
echo ${home}
2、chmod +x test.sh
3、./test.sh
执行当前脚本所在目录
- function main(){
…
}
main “$@”@H_502_11@
- @H_502_11@
测试:main “$@”
1、function main() {
echo "input params is:"$@
}
main "$@"
2、./test.sh xx yy zz
3、输出:input params is: xx yy zz
- else中执行的语句:
export SPARK_SUBMIT_OPTS
“KaTeX parse error: Expected '}', got 'EOF' at end of input: …"Spark shell" "@”@H_502_11@
小结:spark-shell底层调用的是spark-submit,spark-shell运行的时候名字就叫这个,所有的参数跟在spark-shell后面,比如–master等等
此时走到spark-submit。
- exec “SAPRKHOME"/bin/spark−classorg.apaceh.SparkSubmit"@”@H_502_11@
- exec表示执行@H_502_11@
cd sbin
cat start-all.sh
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。