任务不可序列化:java.io.NotSerializableException仅在类而不是对象上调用闭包外的函数时

scala serialization apache-spark typesafe

141686 观看

6回复

6116 作者的声誉

在闭包之外调用函数时会出现奇怪的行为:

  • 当函数在一个对象中时,一切正常
  • 当函数在类中时获取:

任务不可序列化:java.io.NotSerializableException:testing

问题是我需要在类中的代码而不是对象。知道为什么会这样吗?Scala对象是否已序列化(默认?)?

这是一个有效的代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是一个非工作的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
作者: Nimrod007 的来源 发布者: 2014 年 3 月 23 日

回应 (6)


298

8468 作者的声誉

决定

我不认为其他答案是完全正确的。RDD确实可以序列化,因此这不会导致您的任务失败。

Spark是一种分布式计算引擎,其主要抽象是弹性分布式数据集(RDD),可以将其视为分布式集合。基本上,RDD的元素在集群的节点之间进行分区,但Spark将其抽象远离用户,让用户与RDD(集合)进行交互,就好像它是本地的一样。

不要让太多细节,但是当你在一个RDD(运行不同的变换mapflatMapfilter等),您的转换代码包(closure)是:

  1. 在驱动程序节点上序列化,
  2. 运送到群集中的相应节点,
  3. 反序列化,
  4. 最后在节点上执行

您当然可以在本地运行(如您的示例所示),但所有这些阶段(除了通过网络传输)仍然会发生。[这使您可以在部署到生产之前捕获任何错误]

在第二种情况下发生的是您正在调用一个方法,该方法testing在map函数内部定义。Spark看到了这一点,并且因为方法无法自行序列化,所以Spark会尝试序列化整个 testing类,这样代码在另一个JVM中执行时仍然可以工作。你有两种可能性:

要么使类测试可序列化,所以Spark可以序列化整个类:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

或者你创建someFunc函数而不是方法(函数是Scala中的对象),以便Spark能够序列化它:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

您可能会对类序列化的类似但不同的问题感兴趣,您可以在2013年Spark Summit演示中阅读它。

作为一个侧面说明,你可以重写rddList.map(someFunc(_))rddList.map(someFunc),他们是完全一样的。通常,第二种是优选的,因为它不那么冗长和清晰。

编辑(2015-03-15):SPARK-5307引入了SerializationDebugger,Spark 1.3.0是第一个使用它的版本。它添加了NotSerializableException的序列化路径。遇到NotSerializableException时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象。

在OP的情况下,这是打印到stdout的内容:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
作者: Grega Kešpret 发布者: 23.03.2014 08:48

8

894 作者的声誉

我用不同的方法解决了这个问题。您只需要在通过闭包之前序列化对象,然后进行反序列化。即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo。你需要的只是一些咖喱。;)

这是我如何做到的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

随意使Blah像你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用。

KryoSerializationWrapper指的是:https//github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

作者: Nilesh 发布者: 14.04.2014 06:54

31

1001 作者的声誉

Grega的答案很好地解释了为什么原始代码不起作用以及解决问题的两种方法。但是,这个解决方案不是很灵活; 考虑你的闭包包含一个Serializable你无法控制的非类方法调用的情况。您既不能将Serializable标记添加到此类,也不能更改底层实现以将方法更改为函数。

Nilesh为此提供了一个很好的解决方法,但解决方案可以更加简洁和通用:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

然后,此函数序列化程序可用于自动包装闭包和方法调用:

rdd map genMapper(someFunc)

这项技术的好处是不需要额外的Shark依赖项才能访问KryoSerializationWrapper,因为Twitter的Chill已经被核心Spark所吸引

作者: Ben Sidhom 发布者: 15.07.2014 10:39

25

20556 作者的声誉

完整的讲话充分解释了这个问题,提出了一个很好的范例转换方法来避免这些序列化问题:https//github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

最高投票的答案基本上是建议丢掉整个语言功能 - 不再使用方法而只使用功能。实际上,在函数式编程中应该避免类中的方法,但是将它们转换为函数并不能解决这里的设计问题(参见上面的链接)。

作为这种特殊情况的快速修复,您可以使用@transient注释告诉它不要尝试序列化有问题的值(这里Spark.ctx是一个自定义类,而不是Spark的命名后的Spark):

@transient
val rddList = Spark.ctx.parallelize(list)

您还可以重新构建代码,以便rddList存在于其他地方,但这也是令人讨厌的。

未来可能是孢子

在未来,Scala将包含这些被称为“孢子”的东西,这些东西应该允许我们精细控制粒子控制什么做和不完全被闭合拉入。此外,这应该将所有错误意外地将非可序列化类型(或任何不需要的值)引入编译错误,而不是现在这是可怕的运行时异常/内存泄漏。

http://docs.scala-lang.org/sips/pending/spores.html

关于Kryo序列化的一个提示

使用kyro时,请将其设置为必须注册,这意味着您会收到错误而不是内存泄漏:

“最后,我知道kryo有kryo.setRegistrationOptional(true)但是我很难找到如何使用它。当这个选项打开时,如果我没有注册,kryo似乎仍然会抛出异常类“。

用kryo注册类的策略

当然,这只能为您提供类型级控制而不是值级控制。

......更多想法来了。

作者: samthebest 发布者: 12.08.2014 05:33

7

1453 作者的声誉

我不完全确定这适用于Scala,但是在Java中,我NotSerializableException通过重构我的代码解决了这个问题,因此闭包不会访问不可序列化的final字段。

作者: Trebor Rude 发布者: 13.10.2014 06:14

7

1019 作者的声誉

我遇到了类似的问题,我从Grega的回答中得到的理解是

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

你的doIT方法正在尝试序列化someFunc(_)方法,但由于方法不可序列化,它会尝试序列化类测试,这又是不可序列化的。

所以让你的代码工作,你应该在doIT方法中定义someFunc。例如:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

如果有多个功能出现在图片中,那么所有这些功能都应该可用于父上下文。

作者: Tarang Bhalodia 发布者: 10.02.2017 10:00
32x32