PySpark简明教程 02 pyspark实战 入门例子

  • Post author:
  • Post category:其他



一个简单的例子带你进入pyspark的大门,对!我们的入门程序不是wordcount,也不是hello world。我们不一样、不一样。


目标:找到股息率大于3%的行业,将结果输出到指定目录。

1 实验文件

文件为深圳股市的股息率统计(截止20210531),

下载链接

。列出了主要行业的分红情况,从中可以看出哪些是铁公鸡,哪些是现金奶牛。

准备工作,把这个文件上传到HDFS或其他『炫酷』的分布式文件系统。

为什么实验从这里开始,因为在通常情况下,我们要分析的数据都是海量的,都是在分布式文件系统里存着的。

一条命令将本地文件上传到HDFS

hadoop fs -put localfile /yourHdfsPath/0531/
或者
hadoop fs -put localfile hdfs://host:port/yourHdfsPath/0531/

2 pyspark输入

找到spark的客户端,运行bin目录下的pyspark,打开pyspark的shell命令窗口。后续会介绍pyspark详细启动参数。

df = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/yourHdfsPath/0531/")

备注:spark是内置变量,代表sparksession;这条命令见名知意,非常易懂,spark想读文件(read),目标文件csv格式的(format),额外的选择(option):智能推测数据的类型(inferSchema=true),csv文件的第一行包行列名(header=true),加载(load)指定目录的文件。因为这是刚开始讲解,没必要纠结每个细节,只要直观感受一下就好,后续会详细解释每个『单词』的含义!

# 加载后的数据类型为DataFrame,类似于pandas的DataFrame但是略有区别。
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>

输入的数据将变为DataFrame类型的数据,可以用pandas的DataFrame来类比理解。

本小节后续代码只是展示pyspark的能力,先不用深入理解,当然好学的人看完就已经会了。

不要说话先看代码

>>> df.printSchema()# 展示目标文件的数据类型,记住这个函数!
root
 |-- 行业代码: string (nullable = true)
 |-- 行业名称: string (nullable = true)
 |-- 最新股息率: double (nullable = true)
 |-- 股票家数: integer (nullable = true)
 |-- 其中未分红家数: integer (nullable = true)
 |-- 最近一个月平均股息率: double (nullable = true)
 |-- 最近三个月平均股息率: double (nullable = true)
 |-- 最近六个月平均股息率: double (nullable = true)
 |-- 最近一年平均股息率: double (nullable = true)

# 是不是很神奇,spark竟然能自动推断出double和integer类型,非常方便。
>>> df.show(10,False) #先展示10行试试
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称             |最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
|A   |农、林、牧、渔业         |1.53 |46  |24     |1.54      |1.51      |1.6       |1.57     |
|B   |采矿业              |3.04 |80  |34     |2.99      |3.15      |3.23      |3.54     |
|C   |制造业              |0.84 |2773|1095   |0.83      |0.84      |0.83      |0.88     |
|D   |电力、热力、燃气及水的生产和供应业|2.3  |121 |36     |2.37      |2.35      |2.47      |2.49     |
|E   |建筑业              |2.26 |102 |36     |2.27      |2.25      |2.29      |2.24     |
|F   |批发和零售业           |1.11 |172 |69     |1.16      |1.19      |1.2       |1.14     |
|G   |交通运输、仓储和邮政业      |1.57 |108 |28     |1.51      |1.5       |1.5       |1.45     |
|H   |住宿和餐饮业           |0.59 |10  |6      |0.62      |0.65      |0.7       |0.8      |
|I   |信息传输、软件和信息技术服务业  |0.47 |351 |139    |0.51      |0.5       |0.49      |0.46     |
|J   |金融业              |3.06 |124 |33     |3.24      |3.17      |3.12      |3.06     |
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
only showing top 10 rows


>>> df.count() #查看目标文件有多少行数据
19

查看高股息率的行业是哪个,你绝对想不到:房地产业是高股息率行业。

# 看到这里,对SQL熟悉的人要高兴了,pandas熟悉的人也会感觉很亲切。
# 查看股息率最高的值是多少
>>> df.selectExpr("max(`最近一个月平均股息率`)").show(10,False)
+---------------+
|max(最近一个月平均股息率)|
+---------------+
|3.72           |
+---------------+

# 查看高股息率对应的行业是哪个?竟然是房地产业
>>> df.where("`最近一个月平均股息率`=3.72").show(10,False)
+----+----+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称|最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+----+-----+----+-------+----------+----------+----------+---------+
|K   |房地产业|3.12 |121 |40     |3.72      |3.71      |3.71      |3.42     |
+----+----+-----+----+-------+----------+----------+----------+---------+

3 pyspark输出

# 过滤出股息率大于3的行业,然后将他们输出到指定目录
>>> result = df.where("`最近一个月平均股息率`>3")
>>> result.count()
2
>>> result.show(2,False)
+----+----+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称|最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+----+-----+----+-------+----------+----------+----------+---------+
|J   |金融业 |3.06 |124 |33     |3.24      |3.17      |3.12      |3.06     |
|K   |房地产业|3.12 |121 |40     |3.72      |3.71      |3.71      |3.42     |
+----+----+-----+----+-------+----------+----------+----------+---------+
>>> # 将DataFrame输出到HDFS某目录
result.write.format("csv").mode("overwrite").option("path","/yourHdfsPath/0531_1").save()

同样,这条命令见名知意,pyspark编程和说话一样流畅。



版权声明:本文为haohaizijhz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。