微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark基础知识

 

 

执行机制

spark分成driver和executor。

driver:提交应用程序入口,main函数执行,进行job调度,dag构建以及调度task。对rdd操作。

executor:执行task,将结果汇总到driver。对rdd具体数据操作。

count,distinct,group by , join 会触发shuffle操作,相同key会拉到同一个节点。

 

 

 

常用函数

join函数

left semi join

left semi join 可以作为exists的高效版本使用,只允许出现左表的字段。

下面三段的代码等价:

left semi join版本

select t1.id
from t1 t1
left semi join t2 t2
on t1.id=t2.id

in 版本

select t1.id
from t1 t1 
where id in (select id from t2)

exists 版本

select t1.id
from t1 t1 
where id exists (select id from t2)

窗口函数

<窗口函数>  over ([partition by <分组列表>] order by <排序列表>)

专用窗口函数: rank(),dense_rank(),row_number()

聚合函数: sum,avg,count,max,min,

 rankdense_rankrow_number
86555
86556
86557
83868

 

sum(...) over() 对所有行求和

sum(...) over(order by) 连续求和

sum(...) over(partition by...) 对同组内所有行求和

sum(...) over(partition by...order by ...) 对同组内连续求和

 

数据倾斜

原因:

(1)key分布不均匀

(2)map端数据倾斜,输入文件多且大小不一

(3)reduce端倾斜

(4)有些sql语句本身倾斜

(5)业务本身特征问题

(6)字段类型int和string的join 【user表id为int,log表id有int有string,hash的过程会把int放在一个executor】

 

解决方案:

(1)选取join key分布均匀的表做驱动表,列裁剪和filter

(2)大小表join,使用map join把小表缓存进内存,map端reduce

(3)大表大表join:某一个空值变成字符串+随机数,倾斜数据分配到不同的reduce上。

(4)count distinct 【只有一个reducer】大量相同特征值 :1)空值单独处理 2) group by再union  3) sum() group by 替换 count distinct

(5)参数设置: hive.groupby.skwindata=true

 

scala常用

udf函数

import org.apache.spark.sql._
val df = Seq(('id1',1),('id2',4),('id3',5)).toDF('id','value')
val spark=df.sparkSession

方案一: spark.udf.register('SimpleUDF',(v:int)->v*v)

df.select($'id',callUDF('simpleUDF',$'value'))

方案二:实名注册UDF

spark.udf.register('isAdult',isAdult)

foreach和foreachPartition

作用:对partition中iterator实行迭代处理,通过function对iterator处理。

foreach:传入一个function,每次foreach得到一个k-v实例,function只在foreach使用。

foreachPartition:传入一个参数,一个partition对应iteratior,每个partition把iterator传入function,可以避免堆栈溢出。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐