Spark - Why does ArrayBuffer seem to get elements that haven't been traversed yet

scala apache-spark

50 观看

1回复

98 作者的声誉

Why does the ArrayBuffer in the MapPartition seem to have elements that it has not traversed yet?

For instance, the way I look at this code, the first item should have 1 element, second 2, third 3 and so on. How could it be possible that the first ArrayBuffer output has 9 items. That would seem to imply that there were 9 iterations prior to the first output, but the yields count makes it clear that this was the first iteration.

val a = ArrayBuffer[Int]()
for(i <- 1 to 9) a += i
for(i <- 1 to 9) a += 9-i
val rdd1 = sc.parallelize(a.toArray())

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = {
    val currentArray = ArrayBuffer[Int]()
    var loss = 0
    var yields = 0
    for (item <- iter) yield {
        currentArray += item
        //var left : Int = -1
        yields += 1
        Row(yields, item.toString(), currentArray)
    }
}

rdd1.mapPartitions(it => timePivotWithLoss(it)).collect()

Output -

[1,1,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[2,2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[3,3,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[4,4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[5,5,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[6,6,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[7,7,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[8,8,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[9,9,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]
[1,8,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[2,7,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[3,6,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[4,5,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[5,4,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[6,3,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[7,2,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[8,1,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
[9,0,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]
作者: Sean 的来源 发布者: 2017 年 12 月 27 日

回应 1


1

23210 作者的声誉

This happens because all rows in the partition use reference to the same mutable object. Spilling to disc could further make it non-deterministic with some objects being serialized and not reflecting the changes.

You can use mutable reference and immutable object:

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = {
  var currentArray = Vector[Int]()
  var loss = 0
  var yields = 0
  for (item <- iter) yield {
    currentArray = currentArray :+ item
    yields += 1
    Row(yields, item.toString(), currentArray)
  }
}

but in general mutable state and Spark are not good match.

作者: hi-zir 发布者: 2017 年 12 月 27 日
32x32