117209 观看

15回复

1328 作者的声誉

### 回应 (15)

156

35308 作者的声誉

``````val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

res0: Array[String] = Array("Roses are red", "Violets are blue")
``````

``````rdd.map(_.length).collect

res1: Array[Int] = Array(13, 16)
``````

``````rdd.flatMap(_.split(" ")).collect

res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")
``````

``````["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]
``````

``````rdd.map(_.split(" ")).collect

res3: Array[Array[String]] = Array(
Array(Roses, are, red),
Array(Violets, are, blue)
)
``````

``````val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

res3: Array[Int] = Array(10,20)
``````

（这里注意到Option的行为类似于具有一个元素或零元素的列表）

15

2364 作者的声誉

``````myRDD.map(x => x*2)
``````

``````myRDD.flatMap(x =>new Seq(2*x,3*x))
``````

``````myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
``````

6

61 作者的声誉

map和flatMap是相似的，从某种意义上说，它们从输入RDD中取一行并在其上应用一个函数。它们的区别在于map中的函数只返回一个元素，而flatMap中的函数可以返回一个元素列表（0或更多）作为迭代器。

71

1084 作者的声誉

``````hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome
``````

## 运用 `map`

``````>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']
``````

## 运用 `flatMap`

``````>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']
``````

• `fm`：使用创建的RDD `flatMap`
• `wc`：RDD使用创建 `map`
``````>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]
``````

`map`在RDD上`wc`会给出以下不需要的输出：

``````>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]
``````

`map`：它通过将给定函数应用于RDD的每个元素来返回新的RDD。函数`map`只返回一个项目。

`flatMap`：类似于`map`，它通过将函数应用于RDD的每个元素来返回新的RDD，但输出被展平。

8

8120 作者的声誉

`map`返回相同数量元素的RDD，而`flatMap`不是。

`flatMap`过滤掉丢失或不正确数据的示例用例

number.csv

``````1
2
3
-
4
-
5
``````

``````from operator import *

def f(row):
try:
return float(row)
except Exception:
return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
``````

flatMap.py用于`flatMap`在添加之前过滤掉缺失的数据。与先前版本相比，添加的数字更少。

``````from operator import *

def f(row):
try:
return [float(row)]
except Exception:
return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
``````

12

5348 作者的声誉

``````➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.

scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3

scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15

scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))

scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
``````

`map`方法类似于`flatMap`，它们都返回一个新的RDD。`map`方法经常使用返回一个新的RDD，`flatMap`方法经常使用拆分字。

2

573 作者的声誉

Flatmap和Map都会转换集合。

map（func）

flatMap（func）

map： - >一个元素out中的一个元素。
flatMap： - > 0或更多元素out（集合）中的一个元素。

-1

181 作者的声誉

map和flatMap的输出差异：

1。`flatMap`

``````val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect()
``````

`````` 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
``````

2 . `map`:

``````val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.map(_.length).collect()
``````

``````3 6 6 3 8
``````

1

19 作者的声誉

``````>>> a="hello what are you doing"
>>> a.split()
``````

['你好'，'什么'，'是'，'你'，'做']

``````>>> b=["hello what are you doing","this is rak"]
>>> b.split()
``````

``````>>> rline=sc.parallelize(b)
>>> type(rline)
``````

``````>>> def fwords(x):
...     return x.split()

>>> rword=rline.map(fwords)
>>> rword.collect()
``````

[['你好'，'什么'，'是'，'你'，'做']，['这'，'是'，'rak']]

``````>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()
``````

['你好'，'什么'，'是'，'你'，'做'，'这'，'是'，'rak']

4

41 作者的声誉

``````rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]

rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
``````

-1

24 作者的声誉

• map（func）返回一个新的分布式数据集，该数据集是通过声明的函数func传递源的每个元素而形成的.so map（）是单个术语

• flatMap（func）与map类似，但每个输入项可以映射到0个或更多输出项，因此func应返回Sequence而不是单个项。

5

350 作者的声誉

``````val array1d = Array ("1,2,3", "4,5,6", "7,8,9")
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)
``````

• 您的地图功能会导致创建多层结构
• 但你想要的只是一个简单 - 平面 - 一维结构，通过删除所有内部分组

-1

5 作者的声誉

`map`：它`RDD`通过将函数应用于每个元素来返回一个新元素`RDD`。.map中的函数只能返回一个项目。

`flatMap`：与map类似，它`RDD`通过将函数应用于RDD的每个元素来返回new ，但输出被展平。

``````sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()
``````

``````sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()
``````

1

11 作者的声誉

`RDD.map` 返回单个数组中的所有元素

`RDD.flatMap` 返回数组数组中的元素

``````Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD
``````

``````val text=sc.textFile("text.txt").map(_.split(" ")).collect
``````

``````text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))
``````

``````val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect
``````

`````` text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)
``````

0

795 作者的声誉

``````@PutMapping(path="/update/{id}", consumes=MediaType.APPLICATION_JSON_VALUE)
public Mono<Account> update(@PathVariable Long id, @RequestBody Account account) {
Mono<Account> accFound = accountRepository.findById(id);
return accFound.map(acc -> {
acc.setAccountBalance(account.getAccountBalance());
return accountRepository.save(acc).block();
});

}
``````

`map``Mono`在内部`accountRepository.save(acc)`返回的内容中添加一个，注意这里返回一个`Mono`，如果我不添加`block()`，该方法`update`最终返回`Mono<Mono<Account>>`- 在这种情况下编译错误。

flatMap：

``````@PutMapping(path="/update/{id}", consumes=MediaType.APPLICATION_JSON_VALUE)
public Mono<Account> update(@PathVariable Long id, @RequestBody Account account) {
Mono<Account> accFound = accountRepository.findById(id);
return accFound.flatMap(acc -> {
acc.setAccountBalance(account.getAccountBalance());
return accountRepository.save(acc);
});

}
``````

`flatMap` 只返回里面返回的内容。