微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark问题总结

spark

Executor内存总体布局

认情况下,Executor不开启堆外内存,因此整个 Executor 端内存布局如下图所示:

整个Executor内存区域分为两块:

1、JVM堆外内存

大小由 @H_502[email protected] 参数指定。认大小为 @H_502_11@executorMemory * 0.10, with minimum of 384m。

此部分内存主要用于JVM自身,字符串, NIO Buffer(Driect Buffer)等开销。此部分为用户代码及Spark 不可操作的内存,不足时可通过调整参数解决

The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

2、堆内内存(ExecutorMemory)

大小由 Spark 应用程序启动时的 @H_502_11@–executor-memory 或 @H_502[email protected] 参数配置,即JVM最大分配的堆内存 (@H_502_11@-Xmx)。Spark为了更高效的使用这部分内存,对这部分内存进行了逻辑上的划分管理。我们在下面的统一内存管理会详细介绍。

Spark 1.6之后引入了统一内存管理,包括了堆内内存 (On-heap Memory) 和堆外内存 (Off-heap Memory) 两大区域,下面对这两块区域进行详细的说明。

 

  1. 执行内存 (Execution Memory) : 主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据;

  2. 存储内存 (Storage Memory) : 主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;

  3. 用户内存(User Memory): 主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息;

  4. 预留内存(Reserved Memory): 系统预留内存,会用来存储Spark内部对象。

  5.  

1. sparksql 中的limit 会导致原本并发处理的任务转换成1个task去处理,如果limit后跟的数很大会导致任务阻塞.

加了 limit 所以一个stage转换成 两个各只有1task 的stage。不加limit的话 就只有一个3k的stage。 sql语句中加了limit的stage:

 

 

不加limit的stage:

 

 

2. hive中mapjoin spark 中broadcastHashJoin

当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuffle+reduce;

在hive中叫mapjoin(map-side join),配置为 hive.auto.convert.join

在spark中叫broadcastHashJoin (broadcast hash join)

常见问题

一.org.apache.spark.shuffle.FetchFailedException

这种问题一般发生在有大量shuffle操作的时候,task不断的Failed,然后又重执行,一直循环下去,非常的耗时。

(1) missing output location

  1. org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

(2) shuffle fetch faild

  1. org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268

3.解决方

一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。

  • spark.executor.memory 15G

  • spark.executor.cores 3

  • spark.cores.max 21

dataframe Demo

# -*- coding: utf-8 -*-
​
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
​
spark = SparkSession \
    .builder \
    .appName("df_test") \
    .enableHiveSupport() \
    .getorCreate()
​
sc = spark.sparkContext
​
# 使用自动类型推断的方式创建dataframe
data = [(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
ia_df = df.select('id', 'age').rdd.map(lambda x: (x[0] + 1, x[1] + 2))
print(ia_df.collect())
df.count()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐