Apache Spark:map vs mapPartitions?

performance scala apache-spark rdd

91296 观看

3回复

1307 作者的声誉

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢。

(编辑)即,两者之间的差异(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
作者: Nicholas White 的来源 发布者: 2014 年 1 月 17 日

回应 (3)


105

115457 作者的声誉

决定

RDD的map和mapPartitions方法之间有什么区别?

方法映射通过应用函数将源RDD的每个元素转换为结果RDD的单个元素。mapPartitions将源RDD的每个分区转换为结果的多个元素(可能没有)。

flatMap的行为是map还是mapPartitions?

flatMap也不能处理单个元素(as map)并生成结果的多个元素(as mapPartitions)。

作者: Alexey Romanov 发布者: 17.01.2014 07:46

96

20248 作者的声誉

进出口。小费 :

每当你进行重量级初始化时,应该对许多RDD元素执行一次而不是每个RDD元素执行一次,并且如果这个初始化(例如从第三方库创建对象)无法序列化(以便Spark可以通过集群将其传输到工作节点),mapPartitions()而不是 使用map()mapPartitions()提供每个工作者任务/线程/分区执行一次初始化,而不是每个RDD数据元素执行一次,例如:见下文。

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2。确实flatMap表现得像地图或类似mapPartitions

是。请看flatmap其自我解释的例2 。

Q1。RDD map和。之间的区别是什么?mapPartitions

mapmapPartitions在分区级别执行功能时,在每个元素级别 使用函数。

示例场景 如果我们在特定RDD分区中有100K元素,那么当我们使用时,我们将触发映射转换使用的函数100K次map

相反,如果我们使用mapPartitions那么我们将只调用一次特定函数,但是我们将传入所有100K记录并在一次函数调用中返回所有响应。

因为map在特定函数上工作很多次会有性能提升,特别是如果函数在每次传递所有元素时不需要做的话就做了一些昂贵的事情(如果是mappartitions)。

地图

对RDD的每个项应用转换函数,并将结果作为新RDD返回。

列出变体

def map [U:ClassTag](f:T => U):RDD [U]

示例:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

这是一个专门的映射,每个分区只调用一次。各个分区的整个内容可通过输入参数(Iterarator [T])作为连续的值流获得。自定义函数必须返回另一个迭代器[U]。组合的结果迭代器会自动转换为新的RDD。请注意,由于我们选择的分区,以下结果中缺少元组(3,4)和(6,7)。

preservesPartitioning指示输入函数是否保留分区器,false除非这是一对RDD并且输入函数不修改键,否则应该是这样。

列出变体

def mapPartitions [U:ClassTag](f:Iterator [T] => Iterator [U],preservesPartitioning:Boolean = false):RDD [U]

例1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

例2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上述程序也可以使用flatMap编写如下。

示例2使用flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

结论:

mapPartitions转换比map它调用你的函数一次/分区更快,而不是一次/元素。

进一步阅读:foreach Vs foreachPartitions何时使用什么?

作者: Ram Ghadiyaram 发布者: 29.08.2016 10:17

15

1881 作者的声誉

地图

  1. 它一次处理一行,非常类似于MapReduce的map()方法。
  2. 您在每一行之后从转换返回。

MapPartitions

  1. 它一次完成整个分区的处理。
  2. 处理完整个分区后,只能从函数返回一次。
  3. 所有中间结果都需要保存在内存中,直到您处理整个分区。
  4. 为您提供MapReduce的setup()map()和cleanup()函数

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

作者: KrazyGautam 发布者: 13.03.2017 12:09
32x32