什么是Spark
Spark是基于内存的快速,通用,可扩展的大数据分析引擎
Spark的内置模块
Spark Core
是Spark可以离线处理的部分,实现了spark的基本功能,包含任务调度,错误恢复,与存储系统交互等模块。Spark Core 中还包含了对弹性分布式数据集的API
Spark sql
可以使用sql结构化语句来查询数据,支持多种数据源,hive,json等
Spark Streaming
是Spark 对实时数据进行流式计算的组件。提供用来操作数据流的API
Spark MLlib
提供常见的机器学习功能的程序库,
Spark的特点
1.快
和Hadoop比Spark要比其快100倍以上,Spark实现了高效的DAG执行引擎,可用内存来高效处理数据流。计算的中间结果保存早内存中。
2.易用
支持java,Python,Scala的API,支持很多算法
3.一站式服务
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark sql)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应目中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本。
4.兼容性
Spark的基本流程
Spark的运行模式
安装地址
http://spark.apache.org
Local 模式
直接解压就OK
官方案例
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
参数说明
–master指定Master的地址,默认为Local
–class:你的应用的启动类(如org.apache.spark.examples.SparkPi)
–deploy-mode:是否发布你的驱动到worker节点(cluster)或者作为一个本地客户端(client) (default: client)*
–conf:任意的Spark配置属性,格式key=value.如果值包含空格,可以加引号“key=value”
application-jar:打包好的应用jar,包含依赖.这个URL在集群中全局可见。比如
hdfs://共享存储系统,如果是file:ll path,那么所有的节点的path 都包含同样的jarapplication-arguments:传给main()方法的参数
–executor-memory 1G指定每个executor可用内存为1G
–total-executor-cores 2指定每个executor 使用的cup核数为2个
案例2(wordcount)
1.创建文件里面写字母等
2.启动Spark的shell窗口
spark-shell
3.shell窗口的操作
//将文件载入
sc.textFile("文件路径")
//将文件内容按空格分割
sc.textFile("文件路径").FlatMap(x=>x.split(" "))
//将文件内容修改为(x,1)的形式
sc.textFile("文件路径").FlatMap(x=>x.split(" ")).map(x=>(x,1))
//将内容提交给计算
sc.textFile("文件路径").FlatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
//将计算好的内容展示
sc.textFile("文件路径").FlatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
Standaload模式
1.配置项:
slaves.template,spark-env.sh.template
1)首先将这两个文件改名
cp slaves.template slaves
cp spark-env.sh.template spark-env.sh
2)配置 slaves
所有的主机名
3)配置 spark-env.sh
SPARK MASTER_HOST=hadoop101
SPARK MASTER PORT=7077
4)分发spark
xsync spark
5)启动spark
sbin/start-all.sh
6)启动shell命令窗口
bin/spark-shell --master spark://zhblinux5:7077
7)运行官方案例
sc.textFile(“文件路径”).FlatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
配置高可用
1.将 spark-env.sh 重新配置
export JAVA_HOME=/home/hduser/software/jdk1.8.0_144
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=zhblinux5,zhblinux6,zhblinux7
-Dspark.deploy.zookeeper.dir=/spark"
1)启动zookeeper
2)启动spark
Yarn模式
概述:
spark 客户端直接连接Yarn,不需要额外构建Spark 集群。
有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client: Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
yarn-cluster: Driver程序运行在由RM (ResourceManager)启动的AP(APPMaster)适用于生产环境。
配置:
1)在hadoop的yarn-site.xml文件中添加
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
YARN_CONF_DIR=/home/hduser/software/hadoop-2.7.2/etc/hadoop
运行官方案例
bin/spark-submit
–class org. apache.spark.examples.SparkPi
–master yarn
–deploy-mode client
./examples/jars/ spark-examples_2.11-2.1.1.jar
100
在idea中操作spark
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
def main(args: Array[String]): Unit = {
var conf = new SparkConf().setMaster("local").setAppName("666")
var sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(args(1))
sc.stop()
}
<manifest>
<mainClass>Wordcount</mainClass>
</manifest>
RDD
1.什么是RDD
RDD叫做弹性分布式数据集,是Spark中最基础的数据抽象,是一个弹性的,不可变的,可分区的,元素可以计算的集合
2.RDD的属性
1.一组分区(Partition),即数据集的基本组成单位;
2.一个计算每个分区的函数;
3.RDD之间的依赖关系;
4.一个Partitioner,即 RDD的分片函数;
5.一个列表,存储存取每个Partition的优先位置( preferred location)
3.RDD的特点
只读
RDD不能对本身进行改动,只能通过转换算子得到一个新的RDD
弹性
存储弹性:内存和磁盘的自动切换
容错弹性:数据丢失可以自动恢复
计算弹性:计算出错重试机制
分片弹性:可根据需求重新分片
依赖
RDD中分有宽依赖和窄依赖,
窄依赖
窄依赖指父RDD中的分区和子RDD中的分区是一一对应的,
宽依赖
宽依赖指父RDD中的分区对应多个子RDD中的分区
4.RDD基本原理
1.RDD读入外部数据源进行创建
2.经过转换操作,每一次都会产生不同的RDD。供下一次转换操作使用
3.最后RDD通过动作操作进行转换,输出到外部数据源
这一系列操作称为Lineage,即DAG拓扑排序结果
优点:惰性调用,管道化
5.为什么采用RDD能高效计算
1.高效的容错性
RDD通过血缘关系可重新计算丢失的分区,无需回滚
2,中间结果持久化到内存,避免不必要的磁盘开销
3.存放数据可以是java对象,避免了序列化和反序列化
6.shuffer操作
只要有shuffer操作就一定有落磁盘的操作
在RDD中有宽依赖就有shuffer操作
7.RDD的阶段划分
在DAG中进行反向解析,遇到宽依赖就断开
遇到窄依赖就将当前的RDD放入Stage中
将窄依赖尽量放发哦同一个Stage 中实现流水线计算
RDD的操作
1.首先启动Spark 的shell窗口
spark-shell
1.创建RDD
1)从文件系统加载数据创建RDD
sc.textFile(“文件路径”)
文件路径:
本地:File:///home/hduser/software/a.txt
hdfs:hdfs://linux1:9000/user/hduser/a.txt
2)通过并行集合创建RDD
sc.parallelize(List(1,2,1,2,1))
sc.mkRDD(List(1,2,1,2))
2.RDD转换算子
1)filter()
筛选出满足函数的元素返回一个新的数组
//判断res17中的元素是否包含hello如果包含将所有包含的项重新组合成一个新的数组
res17.filter(x=>x.contains(“hello”))
2)map()
res1.map(x=>x+1)
3)flatMap()
res2.flatMap(x=>x.split(" "))
4)groupByKey()
var list = List((“hello”,1),(“hello”,1))
list.groupByKey()
返回:(“hello”,(1,1))
5)reduceByKey()
list.reduceByKey((a,b)=>a+b)
返回:(“hello”,2)
行动算子
1)collect
从一个RDD转换成一个数组
2)count()
返回有几个元素
3)first()
返回数据集中的第一个元素
4)take(n)
返回数据集中的前n个元素
5)reduce()
传入一个数组返回一个值
返回数组中的所有数的加和
a.reduce((a+b)=>a+b)
6)foreach()
输出所有元素
a.foreach(print(_))
a.foreach(x=>println(x+1))
a.foreach(x=>{println(x+1); println(x+2)})
分区
1.分区的作用
1)增加并行度
2.分区的原则
使分区的个数尽量接近cpu的核数
1.本地模式默认为本地的机器的cpu数目
2.YARN或Standlone:在集群中说要cpu核心数据总和和2取最大值为默认值
3.设置分区个数
设置分区数
sc.parallelize(List(1,2,3,4,5,2),1)
查看分区数
res1.partitions.size
4.分区个数修改
res1.repartition(2)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。