微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark braodcast join timeout 300

broadCast TimeOut 300.

org.apache.spark.SparkException: Could not execute broadcast in 300 secs.

You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autobroadcastJoinThreshold to -1

一个stt的job被Block之后,经查日志发现有两个表在广播的时候出现了broadcast TimeOut的问题。

sql logic:

create or replace temporary view table_tmp
as 
select 
	/** broADCAST(A_tmp) */
	A_tmp.c1,
	A_tmp.c2,
	A_tmp.c3,
  ...
from source_table A_tmp
join gdw_table.other_table1 B_tmp on ...
join gdw_table.other_table2 C_tmp on ...
group by 1,2,3,4,5,6,...11

检查了一下

A_tmp的数据量,有238 rows 数据。

B_tmp的数据量,有1834203873 rows 数据。

C_tmp的数据量,有289371375 rows 的数据。

Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes Could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).
spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins
spark.sql.autobroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive metastore tables where the command ANALYZE TABLE COmpuTE STATISTICS noscan has been run.
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

问题产生原因:

可能Job的执行时间是美国的凌晨一点,会有大量的资源被占用,资源紧缺,导致没有在规定时间内完成join并相应。

解决方案:

  1. 增大认Timeout时间,set spark.sql.broadcastTimeout = 600.
  2. persist(), persist() 可以使broadcastHashjoin 变成 shuffleHashJoin.
  3. 关闭broadcast join set spark.sql.autobroadcastJoinThreshold = -1
  4. reset job.

选择了一个其他的时间重新执行该job,成功执行。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐