一个简单的例子带你进入pyspark的大门,对!我们的入门程序不是wordcount,也不是hello world。我们不一样、不一样。
目标:找到股息率大于3%的行业,将结果输出到指定目录。
1 实验文件
文件为深圳股市的股息率统计(截止20210531),
下载链接
。列出了主要行业的分红情况,从中可以看出哪些是铁公鸡,哪些是现金奶牛。
准备工作,把这个文件上传到HDFS或其他『炫酷』的分布式文件系统。
为什么实验从这里开始,因为在通常情况下,我们要分析的数据都是海量的,都是在分布式文件系统里存着的。
一条命令将本地文件上传到HDFS
hadoop fs -put localfile /yourHdfsPath/0531/
或者
hadoop fs -put localfile hdfs://host:port/yourHdfsPath/0531/
2 pyspark输入
找到spark的客户端,运行bin目录下的pyspark,打开pyspark的shell命令窗口。后续会介绍pyspark详细启动参数。
df = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/yourHdfsPath/0531/")
备注:spark是内置变量,代表sparksession;这条命令见名知意,非常易懂,spark想读文件(read),目标文件csv格式的(format),额外的选择(option):智能推测数据的类型(inferSchema=true),csv文件的第一行包行列名(header=true),加载(load)指定目录的文件。因为这是刚开始讲解,没必要纠结每个细节,只要直观感受一下就好,后续会详细解释每个『单词』的含义!
# 加载后的数据类型为DataFrame,类似于pandas的DataFrame但是略有区别。
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
输入的数据将变为DataFrame类型的数据,可以用pandas的DataFrame来类比理解。
本小节后续代码只是展示pyspark的能力,先不用深入理解,当然好学的人看完就已经会了。
不要说话先看代码
>>> df.printSchema()# 展示目标文件的数据类型,记住这个函数!
root
|-- 行业代码: string (nullable = true)
|-- 行业名称: string (nullable = true)
|-- 最新股息率: double (nullable = true)
|-- 股票家数: integer (nullable = true)
|-- 其中未分红家数: integer (nullable = true)
|-- 最近一个月平均股息率: double (nullable = true)
|-- 最近三个月平均股息率: double (nullable = true)
|-- 最近六个月平均股息率: double (nullable = true)
|-- 最近一年平均股息率: double (nullable = true)
# 是不是很神奇,spark竟然能自动推断出double和integer类型,非常方便。
>>> df.show(10,False) #先展示10行试试
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称 |最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
|A |农、林、牧、渔业 |1.53 |46 |24 |1.54 |1.51 |1.6 |1.57 |
|B |采矿业 |3.04 |80 |34 |2.99 |3.15 |3.23 |3.54 |
|C |制造业 |0.84 |2773|1095 |0.83 |0.84 |0.83 |0.88 |
|D |电力、热力、燃气及水的生产和供应业|2.3 |121 |36 |2.37 |2.35 |2.47 |2.49 |
|E |建筑业 |2.26 |102 |36 |2.27 |2.25 |2.29 |2.24 |
|F |批发和零售业 |1.11 |172 |69 |1.16 |1.19 |1.2 |1.14 |
|G |交通运输、仓储和邮政业 |1.57 |108 |28 |1.51 |1.5 |1.5 |1.45 |
|H |住宿和餐饮业 |0.59 |10 |6 |0.62 |0.65 |0.7 |0.8 |
|I |信息传输、软件和信息技术服务业 |0.47 |351 |139 |0.51 |0.5 |0.49 |0.46 |
|J |金融业 |3.06 |124 |33 |3.24 |3.17 |3.12 |3.06 |
+----+-----------------+-----+----+-------+----------+----------+----------+---------+
only showing top 10 rows
>>> df.count() #查看目标文件有多少行数据
19
查看高股息率的行业是哪个,你绝对想不到:房地产业是高股息率行业。
# 看到这里,对SQL熟悉的人要高兴了,pandas熟悉的人也会感觉很亲切。
# 查看股息率最高的值是多少
>>> df.selectExpr("max(`最近一个月平均股息率`)").show(10,False)
+---------------+
|max(最近一个月平均股息率)|
+---------------+
|3.72 |
+---------------+
# 查看高股息率对应的行业是哪个?竟然是房地产业
>>> df.where("`最近一个月平均股息率`=3.72").show(10,False)
+----+----+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称|最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+----+-----+----+-------+----------+----------+----------+---------+
|K |房地产业|3.12 |121 |40 |3.72 |3.71 |3.71 |3.42 |
+----+----+-----+----+-------+----------+----------+----------+---------+
3 pyspark输出
# 过滤出股息率大于3的行业,然后将他们输出到指定目录
>>> result = df.where("`最近一个月平均股息率`>3")
>>> result.count()
2
>>> result.show(2,False)
+----+----+-----+----+-------+----------+----------+----------+---------+
|行业代码|行业名称|最新股息率|股票家数|其中未分红家数|最近一个月平均股息率|最近三个月平均股息率|最近六个月平均股息率|最近一年平均股息率|
+----+----+-----+----+-------+----------+----------+----------+---------+
|J |金融业 |3.06 |124 |33 |3.24 |3.17 |3.12 |3.06 |
|K |房地产业|3.12 |121 |40 |3.72 |3.71 |3.71 |3.42 |
+----+----+-----+----+-------+----------+----------+----------+---------+
>>> # 将DataFrame输出到HDFS某目录
result.write.format("csv").mode("overwrite").option("path","/yourHdfsPath/0531_1").save()
同样,这条命令见名知意,pyspark编程和说话一样流畅。