Spark总结
Spark配置函数
-
建立连接
-
关闭连接
- 4> SparkContext.stop()
Transformation类算子
-
一般的RDD处理类算子,RDD称为弹性分布式数据集(理解:一个容器,可以放不同的类型数组)
-
1> .map(func) 将RDD[(long,long,long)]转化为.map(x=>x._1.toString)
-
2> .filter(func) 例如: .filter(x => x.1>0)实现筛选大于0的数据
-
3> a.union(b) 合并两个相同类型的数据集a.union(b)
-
4> .flatMap(func) 将数据集(String数组)中的每一个String拆分使用.flatMap(x=>x.split(",")),这个操作,也被叫做压扁操作,目的是将string数组进行分割,分割后的结果放入数组中。这个过程将String的数组内容变多了,形象的叫做压扁。
-
5> .groupByKey() 将数据进行groupBy,就是将map中的内容放入key:list,同一个相同的key会有一个list,内容也是key
-
集合运算
-
7> rdd1.union(rdd2) 将两个数据集求并
-
8> rdd1.intersection(rdd2) 将两个数据集求交集
-
9> rdd1.subtract(rdd2) 求数据集rdd1所有,但是rdd2没有
Action类算子
- 1> collect()算子返回所有rdd的元素
- 2> count() 计数
- 3> countByValue() 返回一个map,表示唯一元素出现的次数
- 4> take(num) 返回几个元素,尝试访问最少的分区,测试使用
- 5> top(num) 返回前最前面的几个元素
- 6> takeOrdered(num)(ordering) 返回前几个元素,基于排序算法
- 7> takeSample(withReplacement,num,[seed]),取样例
- 8> reduce(func) 合并RDD中的元素
- 9> fold(zero)(func) 与reduce类似 提供zero value
- 10> aggregate(zero Value)(seqOp,combOp) 与fold相似,返回值类型不同。
- 11> foreach(func) 对每一个RDD作用函数,但是什么也不会返回。不返回本地,测试使用foreach(println)
- 12> saveAsTextfile("path") 一般在单机内存可以容纳下的内容时,可以使用collect,将分布集群的数据收集回来,当单机内存容纳不下的时候,使用saveAsTextFile函数,将数据存于分布式的内存中。
RDD的特性
- 血统关系:
- Spark维系RDDs之间的依赖关系和创建关系,Spark使用如下的系统关系图,计算RDD的需求和丢失数据的需求。inputRDD -> filter 得到errorsRDD和warningsRDD,最后将其union
- 延迟计算:
- 并不是所有操作都会让Spark对RDDs进行计算,真正的计算发生在第一次action之中的。
- 好处,减少数据的传输。
- Spark内部记录了Metadata表名transformations操作已经被响应了。
- 加载数据也是延迟的,数据只有必要的时候才会被加载进去。
- 数据的持久化:
keyvalue对的RDDs
-
reduceByKey(func) 把相同的key结合,rdd.reduceByKey((x,y)=>x+y)
-
groupByKey()把相同的key Value分组rdd.groupByKey()
-
combineByKey()
-
mapValues(func)函数作用于每一个pairRDD的每一个元素,Key不变,rdd.mapValues(x=>x+1) {(1,3),(3,5),(3,7)}
-
flatMapValues(func) 符号化使用,rdd.flatMapValues(x=>(xto 5)),将每一个value 不到5的键值对列入map中,超过5的键值对移除
-
keys 仅返回keys rdd.keys{1,2,3}
-
values() 仅返回values
-
sortByKey() 按照key的值排序,rdd.sortByKey()
keyvalue对的RDDs中CombineByKey
-
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 最常用的
聚合函数
,返回值类型可以与输入类型不一样
。 -
RDD分区组成,key要不然是见过的,要不然不是。
Spark环境的运行
- 命令行的运行程序
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master 'local[2]' \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
- 官方文档
(1)基本语法
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
(2)参数说明:
--master 指定Master的地址,默认为Local
--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”
application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
参考:
/video/14395
/article/278738
https://www.bilibili.com/video/BV11A411L7CK
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。