-
注意1:
DataFrame没有可与DataFrame一起使用的map()转换,因此您需要先将DataFrame转换为RDD。 -
注意2:
如果您有大量初始化,请使用PySpark mapPartitions()转换而不是map(),就像mapPartitions()一样,大量初始化仅对每个分区执行一次,而不对每个记录执行一次。
map()例子1
首先,让我们从列表中创建一个RDD。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName("SparkByExamples.com").getOrCreate()
data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
我们为每个元素添加一个值为1的新元素
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
print(element)
map()例子2
data = [('James','Smith','M',30),
('Anna','Rose','F',41),
('Robert','Williams','M',62),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
| James| Smith| M| 30|
| Anna| Rose| F| 41|
| Robert|Williams| M| 62|
+---------+--------+------+------+
# 将x[0],x[1]合并,逗号为分隔符
rdd2=df.rdd.map(lambda x:
(x[0]+","+x[1],x[2],x[3]*2)
)
df2=rdd2.toDF(["name","gender","new_salary"] )
df2.show()
+---------------+------+----------+
| name|gender|new_salary|
+---------------+------+----------+
| James,Smith| M| 60|
| Anna,Rose| F| 82|
|Robert,Williams| M| 124|
+---------------+------+----------+
flatMap()例子
首先,让我们从列表中创建一个RDD。
data = ["Project Gutenberg’s",
"Alice’s Adventures in Wonderland",
"Project Gutenberg’s",
"Adventures in Wonderland",
"Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
print(element)
这将产生以下输出
rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
print(element)
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s
版权声明:本文为weixin_42817995原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。