Spark: Best practice for retrieving big data from RDD to local machine

apache-spark

40046 观看

6回复

258 作者的声誉

I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

UDP: commited toLocalIterator method

作者: epahomov 的来源 发布者: 2014 年 2 月 11 日

回应 6


1

1763 作者的声誉

使用Spark映射/过滤/缩小并稍后下载结果?我认为通常的Hadoop方法会起作用。

api表示有地图-过滤器-saveAsFile命令:https ://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

作者: ya_pulser 发布者: 2014 年 2 月 11 日

43

5727 作者的声誉

决定

Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

TL;DR And the original answer might give a rough idea how it works:

First of all, get the array of partition indexes:

val parts = rdd.partitions

Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}

I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

作者: Wildfire 发布者: 2014 年 2 月 15 日

14

20919 作者的声誉

Wildfire的回答在语义上似乎是正确的,但是我敢肯定,通过使用Spark API,您应该能够大大提高效率。如果你要处理依次对每个分区,我不明白为什么你不能使用map/ filter/ reduce/ reduceByKey/ mapPartitions操作。唯一一次将所有内容都放在一个数组中的唯一一次是当您要执行非monoid操作时,但这似乎并不是您想要的。您应该能够执行以下操作:

rdd.mapPartitions(recordsIterator => your code that processes a single chunk)

或这个

rdd.foreachPartition(partition => {
  partition.toArray
  // Your code
})
作者: samthebest 发布者: 2014 年 3 月 30 日

1

11 作者的声誉

对于Spark 1.3.1,格式如下

val parts = rdd.partitions
    for (p <- parts) {
        val idx = p.index
        val partRdd = data.mapPartitionsWithIndex { 
           case(index:Int,value:Iterator[(String,String,Float)]) => 
             if (index == idx) value else Iterator()}
        val dataPartitioned = partRdd.collect 
        //Apply further processing on data                      
    }
作者: agankur21 发布者: 2015 年 6 月 5 日

9

4301 作者的声誉

这与@Wildlife 建议的方法相同,但用pyspark编写。

这种方法的好处是-它允许用户按顺序访问RDD中的记录。我正在使用此代码将数据从RDD馈送到机器学习工具流程的STDIN中。

rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

产生输出:

partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
作者: vvladymyrov 发布者: 2015 年 6 月 5 日

2

305568 作者的声誉

使用RDD.toLocalIterator()的 pyspark数据框解决方案:

separator  = '|'
df_results = hiveCtx.sql(sql)
columns    = df_results.columns
print separator.join(columns)

# Use toLocalIterator() rather than collect(), as this avoids pulling all of the
# data to the driver at one time.  Rather, "the iterator will consume as much memory
# as the largest partition in this RDD."
MAX_BUFFERED_ROW_COUNT = 10000
row_count              = 0
output                 = cStringIO.StringIO()
for record in df_results.rdd.toLocalIterator():
    d = record.asDict()
    output.write(separator.join([str(d[c]) for c in columns]) + '\n')
    row_count += 1
    if row_count % MAX_BUFFERED_ROW_COUNT== 0:
        print output.getvalue().rstrip()
        # it is faster to create a new StringIO rather than clear the existing one
        # http://stackoverflow.com/questions/4330812/how-do-i-clear-a-stringio-object
        output = cStringIO.StringIO()
if row_count % MAX_BUFFERED_ROW_COUNT:
    print output.getvalue().rstrip()
作者: Mark Rajcok 发布者: 2016 年 9 月 28 日
32x32