微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

最新版Spark 3 HelloWorld

Spark已经发布到3.1.1了,好久没看这个项目了.今天更新下本地仓库,编译下竟然出错了.

@H_502_2@$mvn compile ...... [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:403: type mismatch;  found   : Map[String,org.apache.spark.resource.Resource@R_314_4045@ion]  required: scala.collection.immutable.Map[String,org.apache.spark.Resource@R_314_4045@ion] [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:404: type mismatch;  found   : scala.collection.immutable.Map[String,org.apache.spark.Resource@R_314_4045@ion]  required: Map[String,org.apache.spark.resource.Resource@R_314_4045@ion] [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:554: overloaded method value apply with alternatives:   (env: org.apache.spark.SparkEnv,resources: java.util.Map[String,org.apache.spark.resource.Resource@R_314_4045@ion])Option[org.apache.spark.internal.plugin.PluginContainer] <and>   (sc: org.apache.spark.SparkContext,resources: java.util.Map[String,org.apache.spark.resource.Resource@R_314_4045@ion])Option[org.apache.spark.internal.plugin.PluginContainer]  cannot be applied to (org.apache.spark.SparkContext, java.util.Map[String,org.apache.spark.Resource@R_314_4045@ion]) [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala:118: type mismatch;  found   : java.util.Map[String,org.apache.spark.Resource@R_314_4045@ion]  required: java.util.Map[String,org.apache.spark.resource.Resource@R_314_4045@ion] [INFO] [Info] : java.util.Map[String,org.apache.spark.Resource@R_314_4045@ion] <: java.util.Map[String,org.apache.spark.resource.Resource@R_314_4045@ion]? [INFO] [Info] : false [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:308: too many arguments (6) for method resolveMavendependencies: (packagesExclusions: String, packages: String, repositories: String, ivyRepoPath: String, ivySettingsPath: Option[String])String Note that 'packagesTransitive' is not a parameter name of the invoked method. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:377: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:380: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:383: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:392: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:396: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:450: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/scheduler/Task.scala:101: type mismatch;  found   : Map[String,org.apache.spark.resource.Resource@R_314_4045@ion]  required: Map[String,org.apache.spark.Resource@R_314_4045@ion] [INFO] [Info] : Map[String,org.apache.spark.resource.Resource@R_314_4045@ion] <: Map[String,org.apache.spark.Resource@R_314_4045@ion]? [INFO] [Info] : false [ERROR] 12 errors found ......

赶紧看了项目README,原来本地环境已经不能满足新Spark需要了,Spark项目也打包了相应的编译工具,按照文档编译就好了.

如果先前没下载过这个项目可以使用git clone先把项目下载到本地:

@H_502_2@git clone https://github.com/apache/spark.git


进入项目目录并编译:

@H_502_2@$cd spark $./build/mvn -DskipTests clean package ...... [INFO] Reactor Summary for Spark Project Parent POM 3.2.0-SNAPSHOT: [INFO]  [INFO] Spark Project Parent POM ........................... SUCCESS [  2.562 s] [INFO] Spark Project Tags ................................. SUCCESS [  5.148 s] [INFO] Spark Project Sketch ............................... SUCCESS [  5.963 s] [INFO] Spark Project Local DB ............................. SUCCESS [  1.505 s] [INFO] Spark Project Networking ........................... SUCCESS [  2.883 s] [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  1.516 s] [INFO] Spark Project Unsafe ............................... SUCCESS [  7.137 s] [INFO] Spark Project Launcher ............................. SUCCESS [  1.516 s] [INFO] Spark Project Core ................................. SUCCESS [01:55 min] [INFO] Spark Project ML Local Library ..................... SUCCESS [ 36.128 s] [INFO] Spark Project GraphX ............................... SUCCESS [ 30.925 s] [INFO] Spark Project Streaming ............................ SUCCESS [ 53.579 s] [INFO] Spark Project Catalyst ............................. SUCCESS [03:50 min] [INFO] Spark Project sql .................................. SUCCESS [07:58 min] [INFO] Spark Project ML Library ........................... SUCCESS [02:42 min] [INFO] Spark Project Tools ................................ SUCCESS [ 13.733 s] [INFO] Spark Project Hive ................................. SUCCESS [04:52 min] [INFO] Spark Project REPL ................................. SUCCESS [ 34.085 s] [INFO] Spark Project Assembly ............................. SUCCESS [  8.368 s] [INFO] Kafka 0.10+ Token Provider for Streaming ........... SUCCESS [01:06 min] [INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [02:08 min] [INFO] Kafka 0.10+ Source for Structured Streaming ........ SUCCESS [01:24 min] [INFO] Spark Project Examples ............................. SUCCESS [01:01 min] [INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 10.397 s] [INFO] Spark Avro ......................................... SUCCESS [01:12 min] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time:  31:51 min [INFO] Finished at: 2021-03-13T17:28:05+08:00 [INFO] ------------------------------------------------------------------------

从编译结果看Spark3项目结构还是变化挺大的,像Tags/Sketch/LocalDB/这些原来都没见过.具体每个子项目是做什么的,我们以后再一一介绍.接下来先运行下HelloWorld.试用Spark最简单的方式是使用scala shell:

@H_502_2@tianlang@tianlang:spark$ ./bin/spark-shell  2021-03-14 08:51:53,351 WARN util.Utils: Your hostname, tianlang resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface wlp7s0) 2021-03-14 08:51:53,352 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2021-03-14 08:52:01,141 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://192.168.0.104:4040 Spark context available as 'sc' (master = local[*], app id = local-1615683123916). Spark session available as 'spark'. Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/    /___/ .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT       /_/           Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more @R_314_4045@ion. scala> spark.range(1000*1000*1000).count(); res0: Long = 1000000000

spark.range(1000*1000*1000)是生成一个从0(包含)到10亿(不包含)的序列,可看成是一个只有一列的表t_range:

1
2
......
999999998
999999999


count()是计算下数据条数,有一条算一条总共是10亿条.相当于sql的select count(*) from  t_range. sql的count函数容易跟sql的sum函数弄混,sum是把所有的数据加起来也就是取序列中所有数值的和:0+1+2+3+......+999999998+9999999999;而count是统计下序列中的数据条数有一条算一条,不管具体数值是1还是999999998:1+1+1+......+1+1;

在SparkShell中不能直接调用sum对序列中的数值求和:

@H_502_2@scala> spark.range(1000*1000*1000).sum(); <console>:24: error: value sum is not a member of org.apache.spark.sql.Dataset[Long]        spark.range(1000*1000*1000).sum();

因为spark.range生成的Dataset对象没有sum函数.那怎么实现求和操作呢?

可以查下Spark API文档,看下Dataset有那些函数可以使用:

首先看到的是跟sum长的比较像的summary.看下它的介绍如果英文看不懂也没关系,可以看示例代码.发现它可以用来统计总条数/平均数/最大值/最小值等就是没有取和.看来不是名字像功能就一样啊.

如果先前接触过大数据,应该听过MapReduce.那应该就比较容易找到reduce函数:

defreduce(func: (T, T) ⇒ T): T

(Scala-specific) Reduces the elements of this Dataset using the specified binary function. The given func must be commutative and associative or the result may be non-deterministic.

用reduce取和:

@H_502_2@scala> spark.range(1000*1000*1000).reduce((a,b) => a+b) <console>:24: error: overloaded method value reduce with alternatives:   (func: org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long <and>   (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long  cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)        spark.range(1000*1000*1000).reduce((a,b) => a+b)

为reduce提供一个取和的函数还报错了,从错误信息可以看出是数据类型不匹配问题,来个强制类型转换吧:

scala> spark.range(1000*1000*1000).reduce((a,b) => (a+b).asInstanceOf[java.lang.Long])
res11: Long = 499999999500000000  

当然也可以使用其它的方式实现取和,比如:foreach,但执行方式跟reduce是有差别的,我们后面有机会再说. 

大家应该也感觉到了,使用reduce函数远没有sql中的sum函数方便.sql中的函数用现在比较流行的词叫声明式的API,只需要关注我要什么就可以了,而不需要像reduce一样还要我关注怎么干.

这也是sql经久不衰的一个原因吧.Spark也很早就提供了Spark sql模块用于支持sql语法.可以回头看下我们先前使用的Dataset就是sql包下的:

scala> spark.range(1000*1000*1000);
res2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

我们先前也说可以把range结果类比成一个只有一列的表,也不是随便说说的.还真的可以在上面执行sql语句:

首先把Dataset注册为临时视图(也可以叫临时表,但注册临时表的API在2.0.0后就标记为废弃了):

@H_502_2@scala> spark.range(1000*1000*1000).createOrReplaceTempView("t_range");

接下来就可以对视图t_range执行sql了:


scala> spark.sql("select sum(id) from t_range");
res18: org.apache.spark.sql.DataFrame = [sum(id): bigint]

scala> res18.collect
res19: Array[org.apache.spark.sql.Row] = Array([499999999500000000])

我是怎么知道列名称id的?是通过printSchema函数.

scala> spark.range(1000*1000*1000).printSchema();
root
 |-- id: long (nullable = false)

上面的代码都是使用的Scala,如果更倾向于使用Python.也可以使用./bin/pyspark.Spark3对Python的支持也提到了一个新高度.

HelloWorld就先到这里吧.蚂蚁啃骨头一点一点来.


版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐