工作当中几乎全用Sparksql ,RDD用的很少(面试多) Sparksql误区
Spark sql is Apache Spark’s module for working with structured data. 不要把Sparksql认为就是处理sql的 或者认为就是写sql SparkSQL
误区: 1)Spark sql是处理结构化数据 并不是仅仅能够处理sql sql仅仅是Spark sql这个模块的一小部分应用 API/ExtDS 2)Uniform Data Access 外部数据源(*****) Spark sql是能够处理多种不同的数据源的数据 text、json、parquet、orc、hive、jdbc 数据的格式 HDFS/S3(a/n)/OSS/COS 数据的存储系统 不同的数据格式压缩的不压缩的 sparksql都是兼容的 你访问不同的数据源Sparksql都是用统一的访问方式 这就是外部数据源 Sparksql能面试的东西 就是两个 : DataFrame 、 外部数据源、catelist 2.能集成Hive 你的数仓以前是基于Hive来做的 都是Hive的脚本 现在 如果想使用Sparksql访问Hive的数据 Sparksql能连接到metastore才可以 (把Hive-site.xml 拷贝到Sparkconf目录下就可以了) 因为metastore 是 on Hadoop的核心所在 所以你要把Hive迁移到Spark上来 成本是很低的
3.Standard Connectivity Hive能通过HiveServer2提供一个服务 大家去查,那么 spark里面有个thriftServer 他们底层都是用thrift协议的
误区3: MR==>Hive==> Hive底层当时是MR 慢 所以出来Spark Spark==> AMPLab Shark(为了将Hive sql跑在Spark上) 1.x 配套一个打了补丁的Hive Spark1.0 Shark不维护 ==> Spark sql 是在Spark里面的 ==> Hive on Spark 是在Hive里面的 是Hive的引擎是Spark 误区3) Hive on Spark不是Spark sql Hive刚开始时底层执行引擎只有一个:MR 后期:Tez Spark set hive.execution.engine=spark; 就可以 Hive on Spark Sparksql on Hive X
Hive On Spark
Time taken: 6.86 seconds, Fetched: 2 row(s) hive (default)> set hive.execution.engine; hive.execution.engine=mr hive (default)> set hive.execution.engine=spark; hive (default)> set hive.execution.engine; hive.execution.engine=spark hive (default)> show databases; OK database_name default homework Time taken: 0.008 seconds, Fetched: 2 row(s) hive (default)>
这个东西了解即可 Hive On Spark 真正生产上用的很少的 这个东西不是很成熟的
出来的时间: Spark sql 1.0 SchemaRDD ==> Table RDD(存数据) + schema = Table ==> DataFrame 1.2/3 由SchemaRDD 变为DataFrame 原因是 更加 OO ==> Dataset 1.6 由DataFrame 变为Dataset 因为 compile-time type safety
DataFrame A Dataset is a distributed collection of data. A DataFrame is a Dataset organized into named columns. DataFrame = Dataset[Row] In Scala and Java, a DataFrame is represented by a Dataset of Rows.
DataFrame : 1.named columns 就是一个表 包含 列的名字 + 列的类型 Row : 可以理解为 一行数据 没有scheme的 SparkSession是Spark编程的入口点
SparkSession: /** * Executes a sql query using Spark, returning the result as a `DataFrame`. * The dialect that is used for sql parsing can be configured with 'spark.sql.dialect'. * * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } 注意: 1. returning the result as a `DataFrame` Dataset: /** * displays the top 20 rows of Dataset in a tabular form. * * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @group action * @since 1.6.0 */ def show(truncate: Boolean): Unit = show(20, truncate)
scala> spark.sql("show tables").show +--------+---------+-----------+ |database|tableName|istemporary| +--------+---------+-----------+ | default| student| false| +--------+---------+-----------+ scala> 注意: 启动spark-shell的时候 指定MysqL驱动 个人建议使用 --jars 指定MysqL驱动 不建议把MysqL驱动 直接丢在Spark jar路径里
查看Hive里元数据: MysqL> select * from DBS; +-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+ | DB_ID | DESC | DB_LOCATION_URI | NAME | OWNER_NAME | OWNER_TYPE | +-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+ | 1 | Default Hive database | hdfs://hadoop101:8020/user/hive/warehouse | default | public | ROLE | | 6 | NULL | hdfs://hadoop101:8020/user/hive/warehouse/homework.db | homework | double_happy | USER | +-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+ 2 rows in set (0.00 sec) MysqL> select * from TBLS; +--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+ | TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | +--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+ | 1 | 1568615059 | 1 | 0 | double_happy | 0 | 1 | student | MANAGED_TABLE | NULL | NULL | | 8 | 1568616039 | 6 | 0 | double_happy | 0 | 8 | ods_domain_traffic_info | EXTERNAL_TABLE | NULL | NULL | | 9 | 1568620410 | 6 | 0 | double_happy | 0 | 9 | ods_uid_pid_info | EXTERNAL_TABLE | NULL | NULL | | 17 | 1568860945 | 6 | 0 | double_happy | 0 | 17 | jf_tmp | MANAGED_TABLE | NULL | NULL | | 21 | 1569056727 | 6 | 0 | double_happy | 0 | 21 | access_wide | EXTERNAL_TABLE | NULL | NULL | | 26 | 1569209493 | 6 | 0 | double_happy | 0 | 31 | ods_uid_pid_info_compression_test | EXTERNAL_TABLE | NULL | NULL | | 27 | 1569209946 | 6 | 0 | double_happy | 0 | 32 | ods_uid_pid_compression_info | MANAGED_TABLE | NULL | NULL | | 31 | 1569224142 | 6 | 0 | double_happy | 0 | 36 | dwd_platform_stat_info | MANAGED_TABLE | NULL | NULL | | 53 | 1570957119 | 6 | 0 | double_happy | 0 | 63 | ods_log_info | EXTERNAL_TABLE | NULL | NULL | +--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+ 9 rows in set (0.00 sec) MysqL> spark-shell查询Hive里的表: scala> spark.sql("select * from homework.dwd_platform_stat_info").show +--------+---+--------+--------+ |platform|cnt| d| day| +--------+---+--------+--------+ | Andriod|658|20190921|20190921| | Symbain|683|20190921|20190921| | linux|639|20190921|20190921| | mac|652|20190921|20190921| | windows|640|20190921|20190921| +--------+---+--------+--------+ scala>
使用sparksql 在spark-shell交互 还得写 spark.sql 在spark里 有个 spark-sql 用法和 spark-shell 是一样的
sparksql编程
1.SparkSession构建 object SparkSessionApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("SparkSessionApp") .getorCreate() spark.stop() } } 当然 你spark一些参数如何传进去呢? 提供config传进去 eg : 你要设置多少个分区呀 等
1.读文本数据 object SparkSessionApp { def text(spark: SparkSession) = { import spark.implicits._ val df: DataFrame = spark.read.format("text").load("file:///C:/IdeaProjects/spark/data/data.txt") df.show() } def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("SparkSessionApp") .getorCreate() text(spark) spark.stop() } } 结果: +---------------+ | value| +---------------+ |double_happy,25| | Kairis,25| | Kite,32| +---------------+ 1. 但是有一个问题 读取进来的数据 把所有内容 都放到 value这个列 里面去了 该怎么办? 2. 上面那种写法读进来的是DF
def text(spark: SparkSession) = { val ds: Dataset[String] = spark.read.textFile("file:///C:/IdeaProjects/spark/data/data.txt") ds.show() } 读进来的是DS 结果是一样的: +---------------+ | value| +---------------+ |double_happy,25| | Kairis,25| | Kite,32| +---------------+ /** * Loads text files and returns a [[Dataset]] of String. See the documentation on the * other overloaded `textFile()` method for more details. * @since 2.0.0 */ def textFile(path: String): Dataset[String] = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 textFile(Seq(path): _*) } 可以传入多个路径的 textFile(Seq(path): _*)
看s9 待续。。。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。