MaShu A Data Analyst:严守保密原则,只谈技术技巧,无关业务

Pyspark技能记录:Dataframe基本操作

2019-03-02
MaShu

数据分析师通常用Hive SQL进行操作,但其实可以考虑换成用Pyspark。Spark是基于内存计算的,比Hive快,PySpark SQL模块的很多函数和方法与Hive SQL中的关键字一样,Pyspark还可与Python中的模块结合使用。不过Pyspark Dataframe和Pandas DataFrame差别还是比较大,这里对Pyspark Dataframe的基本操作进行总结。

1. 连接本地spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

2.创建DataFrame

用Spark SQL创建DataFrame:

sparkdf = spark.sql(""" select * from table_name """)

手动创建DataFrame:

df = spark.createDataFrame([('0001','F','H',1), ('0002','M','M',0), ('0003','F','L',1),
                            ('0004','F','H',0), ('0005','M','M',1), ('0006','F','H',1)
                           ], ['userid','gender','level','vip'])

3.概况

3.1 以树形式打印概要

df.printSchema()

root
 |-- userid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- vip: long (nullable = true)

3.2 查询数据分布概况

df.describe().show()

+-------+------------------+------+-----+------------------+
|summary|            userid|gender|level|               vip|
+-------+------------------+------+-----+------------------+
|  count|                 6|     6|    6|                 6|
|   mean|               3.5|  null| null|0.6666666666666666|
| stddev|1.8708286933869707|  null| null|0.5163977794943222|
|    min|              0001|     F|    H|                 0|
|    max|              0006|     M|    M|                 1|
+-------+------------------+------+-----+------------------+

4.行操作

4.1 打印前3行

df.show(3)

+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
|  0001|     F|    H|  1|
|  0002|     M|    M|  0|
|  0003|     F|    L|  1|
+------+------+-----+---+
only showing top 3 rows

4.2 获取头3行到本地

df.head(3)或者df.take(3),两者输出一样,注意数字不能为负。

[Row(userid='0001', gender='F', level='H', vip=1),
 Row(userid='0002', gender='M', level='M', vip=0),
 Row(userid='0003', gender='F', level='L', vip=1)]

4.3 行计数

df.count()

6

4.4 对null或nan的数据进行过滤:

from pyspark.sql.functions import isnan,isnull
df1=df.filter(isnull('gender')) #把gender列里面数据为null的筛选出来(代表python的None类型)
df2=df.filter(isnan("vip"))  # 把vip列里面数据为nan的筛选出来(Not a Number,非数字数据)

+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
+------+------+-----+---+

4.5 行去重

输出是DataFrame: df.select('level').distinct().show()

+-----+
|level|
+-----+
|    M|
|    L|
|    H|
+-----+

输出是Row类: df.select('level').distinct().collect()

[Row(level='M'), Row(level='L'), Row(level='H')]

输出是List: df.select('level').distinct().rdd.map(lambda r: r[0]).collect()

['M', 'L', 'H']

4.6 随机抽样

sample = df.sample(False,0.5,0) frac=0.5 随机抽取50%。 replace= True or False代表是否有放回。 axis=0,对行进行抽样。

+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
|  0002|     M|    M|  0|
|  0004|     F|    H|  0|
|  0005|     M|    M|  1|
+------+------+-----+---+

4.7 按条件选择行

df.where("gender='F' and level='H'").show() 等价于df.filter("gender='F' and level='H'").show()

+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
|  0001|     F|    H|  1|
|  0004|     F|    H|  0|
|  0006|     F|    H|  1|
+------+------+-----+---+

4.8 删除有缺失值的行

df2 = df.dropna() 或者 df2=df.na.drop()

5.列操作

5.1 获取DataFrame所有列名

df.columns

['userid', 'gender', 'level', 'vip']

5.2 选择一列或多列

df.select('level').show()

+-----+
|level|
+-----+
|    H|
|    M|
|    L|
|    H|
|    M|
|    H|
+-----+

df.select(df['gender'], df['vip']+1).show()

+------+---------+
|gender|(vip + 1)|
+------+---------+
|     F|        2|
|     M|        1|
|     F|        2|
|     F|        1|
|     M|        2|
|     F|        2|
+------+---------+

df.select('userid','gender','level').show()

等价于df.select(df.userid, df.gender, df.level).show()

等价于df.select(df['userid'], df['gender'], df['level']).show()

+------+------+-----+
|userid|gender|level|
+------+------+-----+
|  0001|     F|    H|
|  0002|     M|    M|
|  0003|     F|    L|
|  0004|     F|    H|
|  0005|     M|    M|
|  0006|     F|    H|
+------+------+-----+

5.3 按列排序

df.orderBy(df.vip.desc()).show(5) 等价于df.sort('vip', ascending=False).show()

+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
|  0001|     F|    H|  1|
|  0005|     M|    M|  1|
|  0006|     F|    H|  1|
|  0003|     F|    L|  1|
|  0002|     M|    M|  0|
+------+------+-----+---+
only showing top 5 rows

5.4 新增一列

新增一列全部赋值为常量: df.withColumn('label',F.lit(0))

import pyspark.sql.functions as F
df.withColumn('label',F.lit(0)).show()

+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
|  0001|     F|    H|  1|    0|
|  0002|     M|    M|  0|    0|
|  0003|     F|    L|  1|    0|
|  0004|     F|    H|  0|    0|
|  0005|     M|    M|  1|    0|
|  0006|     F|    H|  1|    0|
+------+------+-----+---+-----+

通过另一个已有变量新增一列: df2=df.withColumn('label',df.label+1)

+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
|  0001|     F|    H|  1|    1|
|  0002|     M|    M|  0|    1|
|  0003|     F|    L|  1|    1|
|  0004|     F|    H|  0|    1|
|  0005|     M|    M|  1|    1|
|  0006|     F|    H|  1|    1|
+------+------+-----+---+-----+

5.5 修改列类型

df = df.withColumn("label1", df["label"].cast("String"))
df.printSchema()
root
 |-- userid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- vip: long (nullable = true)
 |-- label: integer (nullable = false)
 |-- label1: string (nullable = false)

5.6 修改列名

df3=df.withColumnRenamed( "label1" , "lbl" ) ——注意df自身没有发生变化。

+------+------+-----+---+-----+---+
|userid|gender|level|vip|label|lbl|
+------+------+-----+---+-----+---+
|  0001|     F|    H|  1|    0|  0|
|  0002|     M|    M|  0|    0|  0|
|  0003|     F|    L|  1|    0|  0|
|  0004|     F|    H|  0|    0|  0|
|  0005|     M|    M|  1|    0|  0|
|  0006|     F|    H|  1|    0|  0|
+------+------+-----+---+-----+---+

5.7 删除列

df4=df.drop('length').show()

+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
|  0001|     F|    H|  1|    0|
|  0002|     M|    M|  0|    0|
|  0003|     F|    L|  1|    0|
|  0004|     F|    H|  0|    0|
|  0005|     M|    M|  1|    0|
|  0006|     F|    H|  1|    0|
+------+------+-----+---+-----+

6.选择和切片

6.1 直接使用filter

df.select('userid','gender','vip').filter(df['vip']>0).show() 或者 df.filter(df['vip']>0).select('userid','gender','vip').show(), 前者select括号里面一定要有‘vip’这一列。

+------+------+---+
|userid|gender|vip|
+------+------+---+
|  0001|     F|  1|
|  0003|     F|  1|
|  0005|     M|  1|
|  0006|     F|  1|
+------+------+---+

多重选择 df.filter(df['vip']>0).filter(df['gender']=='F').show()

+------+------+-----+---+-----+------+
|userid|gender|level|vip|label|label1|
+------+------+-----+---+-----+------+
|  0001|     F|    H|  1|    0|     0|
|  0003|     F|    L|  1|    0|     0|
|  0006|     F|    H|  1|    0|     0|
+------+------+-----+---+-----+------+

6.2 filter方法的SQL

df.filter("gender='M'").show(); df.filter("level like 'M%'").show()

+------+------+-----+---+-----+------+
|userid|gender|level|vip|label|label1|
+------+------+-----+---+-----+------+
|  0002|     M|    M|  0|    0|     0|
|  0005|     M|    M|  1|    0|     0|
+------+------+-----+---+-----+------+

6.3 where方法的SQL

df.where("level like 'M%'").show()

6.4 直接使用SQL语法

# 首先dataframe注册为临时表,然后执行SQL查询
df.createOrReplaceTempView("df5")
spark.sql("select count(1) from df5").show()

+--------+
|count(1)|
+--------+
|       6|
+--------+

7. 合并 join / union

7.1 横向拼接

df.union(df).show()或者df.unionAll(df.limit(1)).show(),二者好像没看出啥区别。

7.2 纵向拼接

df.join(dfa, ["userid"],'left').show()

dfa=spark.createDataFrame([('0001',2000), ('0002',4000), ('0005',3000)], ['userid','salary'])
dfb=spark.createDataFrame([('0001','2018-10-01 10:10:01'), ('0002','2018-11-01 10:10:01'), ('0003','2018-09-01 10:10:01'),
                        ('0004','2018-10-18 10:10:01'), ('0005','2018-10-19 10:10:01'), ('0006','2019-01-01 10:10:01')
                        ], ['userid','regdt'])
df=df.join(dfa, ["userid"],'left').join(dfb, ["userid"],'left')

+------+------+-----+---+-----+------+------+-------------------+
|userid|gender|level|vip|label|label1|salary|              regdt|
+------+------+-----+---+-----+------+------+-------------------+
|  0002|     M|    M|  0|    0|     0|  4000|2018-11-01 10:10:01|
|  0003|     F|    L|  1|    0|     0|  null|2018-09-01 10:10:01|
|  0001|     F|    H|  1|    0|     0|  2000|2018-10-01 10:10:01|
|  0006|     F|    H|  1|    0|     0|  null|2019-01-01 10:10:01|
|  0004|     F|    H|  0|    0|     0|  null|2018-10-18 10:10:01|
|  0005|     M|    M|  1|    0|     0|  3000|2018-10-19 10:10:01|
+------+------+-----+---+-----+------+------+-------------------+

7.3 分组排序

import pyspark.sql.functions as F
from pyspark.sql import Window
# row_number()函数
df.withColumn("row_number", F.row_number().over(Window.partitionBy('gender').orderBy('regdt'))).show() 

+------+------+-----+---+-----+------+------+-------------------+----------+
|userid|gender|level|vip|label|label1|salary|              regdt|row_number|
+------+------+-----+---+-----+------+------+-------------------+----------+
|  0003|     F|    L|  1|    0|     0|  null|2018-09-01 10:10:01|         1|
|  0001|     F|    H|  1|    0|     0|  2000|2018-10-01 10:10:01|         2|
|  0004|     F|    H|  0|    0|     0|  null|2018-10-18 10:10:01|         3|
|  0006|     F|    H|  1|    0|     0|  null|2019-01-01 10:10:01|         4|
|  0005|     M|    M|  1|    0|     0|  3000|2018-10-19 10:10:01|         1|
|  0002|     M|    M|  0|    0|     0|  4000|2018-11-01 10:10:01|         2|
+------+------+-----+---+-----+------+------+-------------------+----------+

7.3 差集

df.select('userid').subtract(dfa.select('userid')).show()

+------+
|userid|
+------+
|  0003|
|  0006|
|  0004|
+------+

(未完待续。。)


Similar Posts

上一篇 2019 新起点

Comments