微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么这个Spark代码会抛出java.io.NotSerializableException

我想在RDD上的转换中访问伴随对象的方法.为什么以下不起作用:

import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class Abc {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

object Abc {
  def fn(x: Int): Double = { x.todouble }
}

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]

new Abc().transform(sc.parallelize(1 to 10)).collect

上面的代码抛出了一个java.io.NotSerializableException:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at Abc.transform(<console>:19)
  ... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
        - object not serializable (class: Abc, value: Abc@4f598dfb)
        - field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
        - object (class Abc$$anonfun$transform$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 57 more

即使为Abc类定义一个编码器也无济于事.但更重要的问题是,为什么要尝试Abc类对象的序列化呢?我的第一个想法是伴侣对象是类的单个对象,所以可能尝试序列化它.但似乎并非如此,因为当我从另一个调用Abc.fn时:

class Xyz {
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]

new Xyz().transform(sc.parallelize(1 to 10)).collect

我得到一个java.io.NotSerializableException:Xyz

解决方法:

这篇文章讨论了Apache Spark中的“可序列化”与“不可序列化的对象”:

Using Non-Serializable Objects in Apache Spark, Nicola Ferraro

该文提出了以下几点建议:

>在您的特定情况下发生了什么
>一些替代方案,因此您的对象不需要“可序列化”

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐