在spark集群上,将读取到的csv文件生成的datafream,需要对其中一列进行转化,内置的udf函数已经不能满足需求
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7334dcbe) - field (class: algorithm.config.Base, name: sc, type: class org.apache.spark.SparkContext) - object (class algorithm.config.Base, algorithm.config.Base@4a6c33a0) - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1, name: $outer, type: class algorithm.config.Base) - object (class algorithm.config.Base$$anonfun$save_data_parquet$1, <function1>) - field (class: algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, name: $outer, type: class algorithm.config.Base$$anonfun$save_data_parquet$1) - object (class algorithm.config.Base$$anonfun$save_data_parquet$1$$anonfun$2, <function1>) - element of array (index: 25) - array (class [Ljava.lang.Object;, size 26)
百度了半天,最后总结如下:
当你执行各种转换(map,flatMap,filter等等)的时候,会有以下转换: 1、在driver节点上序列化, 2、上传到合适的集群中的节点, 3、在节点上执行反序列化, 4、最后在节点上执行。 自定义一个udf,并使用这个udf,那么Spark知道不能序列化这个方法,于是试图序列化整个类,因此才能使得这个方法能运行在其他JVM之上,正因为本例没有序列化,所以才出现异常。
解决方法:
1. 在val sc = spark.sparkContext 上面加 @transient
2. 类继承序列化类
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。