机器学习中特征变量有两种:分类变量和顺序变量,对分类变量需要进行二值化处理,Pyspark也提供One-Hot方法,但和sklearn中的One-Hot一样,新的变量不能保留原变量名称,但是我们可以用Pivot方法来巧妙实现分类变量的二值化处理。
二值化处理的效果如下: 数据处理前:
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0001| F| H| 1|
| 0002| M| M| 0|
| 0003| F| L| 1|
| 0004| F| H| 0|
| 0005| M| M| 1|
| 0006| F| H| 1|
+------+------+-----+---+
数据处理后:
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
|userid|gender|level|vip| x|gender_F|gender_M|level_L|level_M|level_H|vip_0|vip_1|
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
| 0002| M| M| 0| 1| 0.0| 1.0| 0.0| 1.0| 0.0| 1.0| 0.0|
| 0003| F| L| 1| 1| 1.0| 0.0| 1.0| 0.0| 0.0| 0.0| 1.0|
| 0001| F| H| 1| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 1.0|
| 0006| F| H| 1| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 1.0|
| 0004| F| H| 0| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 1.0| 0.0|
| 0005| M| M| 1| 1| 0.0| 1.0| 0.0| 1.0| 0.0| 0.0| 1.0|
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
什么是Pivot?
具体见下图:
先看下面的例子了解下PySpark DataFrame的Pivot实现方法:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pyspark.sql.functions as F
# 原始数据
df = spark.createDataFrame([('0001','F','H',1), ('0002','M','M',0), ('0003','F','L',1),
('0004','F','H',0), ('0005','M','M',1), ('0006','F','H',1)
], ['userid','gender','level','vip'])
样本数据如下:
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0001| F| H| 1|
| 0002| M| M| 0|
| 0003| F| L| 1|
| 0004| F| H| 0|
| 0005| M| M| 1|
| 0006| F| H| 1|
+------+------+-----+---+
透视操作简单直接,逻辑如下(这种方法类似SQL中的case…when…then…else…end):
代码如下:
df_pivot = df.groupBy('gender')\
.pivot('level', ['H','M','L'])\
.agg(F.countDistinct('userid'))\
.fillna(0)
结果如下:
+------+---+---+---+
|gender| H| M| L|
+------+---+---+---+
| F| 3| 0| 1|
| M| 0| 2| 0|
+------+---+---+---+
受这种方法启发,我们可以对原数据新增一列:
+------+------+-----+---+---+
|userid|gender|level|vip| x|
+------+------+-----+---+---+
| 0001| F| H| 1| 1|
| 0002| M| M| 0| 1|
| 0003| F| L| 1| 1|
| 0004| F| H| 0| 1|
| 0005| M| M| 1| 1|
| 0006| F| H| 1| 1|
+------+------+-----+---+---+
然后对每一个分类特征变量列,以主键(主键意味着不重复,这里是user_id)为分组,进行聚合(sum和avg都可以):
newdf_pivot = newdf.groupBy('userid')\
.pivot(col, value_sets)\
.agg(F.avg('x'))\
.fillna(0)
于是得到若干个列切片的Dataframe, 这里user_id重命名成uid是为了跟主键user_id区别:
+----+--------+--------+
| uid|gender_F|gender_M|
+----+--------+--------+
|0002| 0.0| 1.0|
|0003| 1.0| 0.0|
|0001| 1.0| 0.0|
|0006| 1.0| 0.0|
|0004| 1.0| 0.0|
|0005| 0.0| 1.0|
+----+--------+--------+
+----+-------+-------+-------+
| uid|level_L|level_M|level_H|
+----+-------+-------+-------+
|0002| 0.0| 1.0| 0.0|
|0003| 1.0| 0.0| 0.0|
|0001| 0.0| 0.0| 1.0|
|0006| 0.0| 0.0| 1.0|
|0004| 0.0| 0.0| 1.0|
|0005| 0.0| 1.0| 0.0|
+----+-------+-------+-------+
+----+-----+-----+
| uid|vip_0|vip_1|
+----+-----+-----+
|0002| 1.0| 0.0|
|0003| 0.0| 1.0|
|0001| 0.0| 1.0|
|0006| 0.0| 1.0|
|0004| 1.0| 0.0|
|0005| 0.0| 1.0|
+----+-----+-----+
然后把原Dataframe跟上面的Dataframe进行列合并,去掉uid列,最终得到:
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
|userid|gender|level|vip| x|gender_F|gender_M|level_L|level_M|level_H|vip_0|vip_1|
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
| 0002| M| M| 0| 1| 0.0| 1.0| 0.0| 1.0| 0.0| 1.0| 0.0|
| 0003| F| L| 1| 1| 1.0| 0.0| 1.0| 0.0| 0.0| 0.0| 1.0|
| 0001| F| H| 1| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 1.0|
| 0006| F| H| 1| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 0.0| 1.0|
| 0004| F| H| 0| 1| 1.0| 0.0| 0.0| 0.0| 1.0| 1.0| 0.0|
| 0005| M| M| 1| 1| 0.0| 1.0| 0.0| 1.0| 0.0| 0.0| 1.0|
+------+------+-----+---+---+--------+--------+-------+-------+-------+-----+-----+
全部代码:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
# 创建DataFrame
sparkdf = = spark.createDataFrame([('0001','F','H',1), ('0002','M','M',0), ('0003','F','L',1),
('0004','F','H',0), ('0005','M','M',1), ('0006','F','H',1)
], ['userid','gender','level','vip'])
# 新增一列,列取值全部为1
newdf = sparkdf.withColumn("x", lit(1))
# 获取列名称
colnames=sparkdf.columns
# 遍历列
for col in colnames:
# 非分类变量列跳过操作
if col<>'userid':
# 获取每个分类变量的取值范围,这一点就不用像SQL中的case...when...then...else...end那样需要手动穷举啦
value_sets=sparkdf.select(col).distinct().rdd.map(lambda r: r[0]).collect()
newdf_pivot = newdf.groupBy('userid')\ # 使用主键分组
.pivot(col, value_sets)\ # 按照分类变量取值进行聚合
.agg(F.avg('x'))\
.fillna(0) # 分组中没有该取值的则为0
# 为了保留列变量名名称,重命名新的列为:列名_取值
for value in value_sets:
newdf_pivot=newdf_pivot.withColumnRenamed(str(value),col+'_'+str(value))
# 为了与原Dataframe的主键userid区分,重命名userid为uid
newdf_pivot=newdf_pivot.withColumnRenamed('userid','uid') # 为了与原Dataframe的主键userid区分,重命名userid为uid
newdf_pivot.show()
# 将进行过二值化处理的新Dataframe左连接合并到原Dataframe
newdf=newdf.join(newdf_pivot, newdf.userid == newdf_pivot.uid, "inner")
# 把uid列去掉
newdf=newdf.drop('uid')
newdf.show()
想想,如果有空值要怎么操作? 我的答案是——遍历列时,先把空值赋值为’NA’(String),-1(Int),只要其他取值能不一样就好,其他代码一样。
数据分析师通常用Hive SQL进行操作,但其实可以考虑换成用Pyspark。Spark是基于内存计算的,比Hive快,PySpark SQL模块的很多函数和方法与Hive SQL中的关键字一样,Pyspark还可与Python中的模块结合使用。不过Pyspark Dataframe和Pandas DataFrame差别还是比较大,这里对Pyspark Dataframe的基本操作进行总结。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
用Spark SQL创建DataFrame:
sparkdf = spark.sql(""" select * from table_name """)
手动创建DataFrame:
df = spark.createDataFrame([('0001','F','H',1), ('0002','M','M',0), ('0003','F','L',1),
('0004','F','H',0), ('0005','M','M',1), ('0006','F','H',1)
], ['userid','gender','level','vip'])
df.printSchema()
root
|-- userid: string (nullable = true)
|-- gender: string (nullable = true)
|-- level: string (nullable = true)
|-- vip: long (nullable = true)
df.describe().show()
+-------+------------------+------+-----+------------------+
|summary| userid|gender|level| vip|
+-------+------------------+------+-----+------------------+
| count| 6| 6| 6| 6|
| mean| 3.5| null| null|0.6666666666666666|
| stddev|1.8708286933869707| null| null|0.5163977794943222|
| min| 0001| F| H| 0|
| max| 0006| M| M| 1|
+-------+------------------+------+-----+------------------+
df.show(3)
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0001| F| H| 1|
| 0002| M| M| 0|
| 0003| F| L| 1|
+------+------+-----+---+
only showing top 3 rows
df.head(3)
或者df.take(3)
,两者输出一样,注意数字不能为负。
[Row(userid='0001', gender='F', level='H', vip=1),
Row(userid='0002', gender='M', level='M', vip=0),
Row(userid='0003', gender='F', level='L', vip=1)]
df.count()
6
from pyspark.sql.functions import isnan,isnull
df1=df.filter(isnull('gender')) #把gender列里面数据为null的筛选出来(代表python的None类型)
df2=df.filter(isnan("vip")) # 把vip列里面数据为nan的筛选出来(Not a Number,非数字数据)
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
+------+------+-----+---+
输出是DataFrame:
df.select('level').distinct().show()
+-----+
|level|
+-----+
| M|
| L|
| H|
+-----+
输出是Row类:
df.select('level').distinct().collect()
[Row(level='M'), Row(level='L'), Row(level='H')]
输出是List:
df.select('level').distinct().rdd.map(lambda r: r[0]).collect()
['M', 'L', 'H']
sample = df.sample(False,0.5,0)
frac=0.5 随机抽取50%。
replace= True or False代表是否有放回。
axis=0,对行进行抽样。
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0002| M| M| 0|
| 0004| F| H| 0|
| 0005| M| M| 1|
+------+------+-----+---+
df.where("gender='F' and level='H'").show()
等价于df.filter("gender='F' and level='H'").show()
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0001| F| H| 1|
| 0004| F| H| 0|
| 0006| F| H| 1|
+------+------+-----+---+
df2 = df.dropna()
或者
df2=df.na.drop()
df.columns
['userid', 'gender', 'level', 'vip']
df.select('level').show()
+-----+
|level|
+-----+
| H|
| M|
| L|
| H|
| M|
| H|
+-----+
df.select(df['gender'], df['vip']+1).show()
+------+---------+
|gender|(vip + 1)|
+------+---------+
| F| 2|
| M| 1|
| F| 2|
| F| 1|
| M| 2|
| F| 2|
+------+---------+
df.select('userid','gender','level').show()
等价于df.select(df.userid, df.gender, df.level).show()
等价于df.select(df['userid'], df['gender'], df['level']).show()
+------+------+-----+
|userid|gender|level|
+------+------+-----+
| 0001| F| H|
| 0002| M| M|
| 0003| F| L|
| 0004| F| H|
| 0005| M| M|
| 0006| F| H|
+------+------+-----+
df.orderBy(df.vip.desc()).show(5)
等价于df.sort('vip', ascending=False).show()
+------+------+-----+---+
|userid|gender|level|vip|
+------+------+-----+---+
| 0001| F| H| 1|
| 0005| M| M| 1|
| 0006| F| H| 1|
| 0003| F| L| 1|
| 0002| M| M| 0|
+------+------+-----+---+
only showing top 5 rows
新增一列全部赋值为常量:
df.withColumn('label',F.lit(0))
import pyspark.sql.functions as F
df.withColumn('label',F.lit(0)).show()
+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
| 0001| F| H| 1| 0|
| 0002| M| M| 0| 0|
| 0003| F| L| 1| 0|
| 0004| F| H| 0| 0|
| 0005| M| M| 1| 0|
| 0006| F| H| 1| 0|
+------+------+-----+---+-----+
通过另一个已有变量新增一列:
df2=df.withColumn('label',df.label+1)
+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
| 0001| F| H| 1| 1|
| 0002| M| M| 0| 1|
| 0003| F| L| 1| 1|
| 0004| F| H| 0| 1|
| 0005| M| M| 1| 1|
| 0006| F| H| 1| 1|
+------+------+-----+---+-----+
df = df.withColumn("label1", df["label"].cast("String"))
df.printSchema()
root
|-- userid: string (nullable = true)
|-- gender: string (nullable = true)
|-- level: string (nullable = true)
|-- vip: long (nullable = true)
|-- label: integer (nullable = false)
|-- label1: string (nullable = false)
df3=df.withColumnRenamed( "label1" , "lbl" )
——注意df自身没有发生变化。
+------+------+-----+---+-----+---+
|userid|gender|level|vip|label|lbl|
+------+------+-----+---+-----+---+
| 0001| F| H| 1| 0| 0|
| 0002| M| M| 0| 0| 0|
| 0003| F| L| 1| 0| 0|
| 0004| F| H| 0| 0| 0|
| 0005| M| M| 1| 0| 0|
| 0006| F| H| 1| 0| 0|
+------+------+-----+---+-----+---+
df4=df.drop('length').show()
+------+------+-----+---+-----+
|userid|gender|level|vip|label|
+------+------+-----+---+-----+
| 0001| F| H| 1| 0|
| 0002| M| M| 0| 0|
| 0003| F| L| 1| 0|
| 0004| F| H| 0| 0|
| 0005| M| M| 1| 0|
| 0006| F| H| 1| 0|
+------+------+-----+---+-----+
df.select('userid','gender','vip').filter(df['vip']>0).show()
或者
df.filter(df['vip']>0).select('userid','gender','vip').show()
,
前者select括号里面一定要有‘vip’这一列。
+------+------+---+
|userid|gender|vip|
+------+------+---+
| 0001| F| 1|
| 0003| F| 1|
| 0005| M| 1|
| 0006| F| 1|
+------+------+---+
多重选择
df.filter(df['vip']>0).filter(df['gender']=='F').show()
+------+------+-----+---+-----+------+
|userid|gender|level|vip|label|label1|
+------+------+-----+---+-----+------+
| 0001| F| H| 1| 0| 0|
| 0003| F| L| 1| 0| 0|
| 0006| F| H| 1| 0| 0|
+------+------+-----+---+-----+------+
df.filter("gender='M'").show()
;
df.filter("level like 'M%'").show()
+------+------+-----+---+-----+------+
|userid|gender|level|vip|label|label1|
+------+------+-----+---+-----+------+
| 0002| M| M| 0| 0| 0|
| 0005| M| M| 1| 0| 0|
+------+------+-----+---+-----+------+
df.where("level like 'M%'").show()
# 首先dataframe注册为临时表,然后执行SQL查询
df.createOrReplaceTempView("df5")
spark.sql("select count(1) from df5").show()
+--------+
|count(1)|
+--------+
| 6|
+--------+
df.union(df).show()
或者df.unionAll(df.limit(1)).show()
,二者好像没看出啥区别。
df.join(dfa, ["userid"],'left').show()
dfa=spark.createDataFrame([('0001',2000), ('0002',4000), ('0005',3000)], ['userid','salary'])
dfb=spark.createDataFrame([('0001','2018-10-01 10:10:01'), ('0002','2018-11-01 10:10:01'), ('0003','2018-09-01 10:10:01'),
('0004','2018-10-18 10:10:01'), ('0005','2018-10-19 10:10:01'), ('0006','2019-01-01 10:10:01')
], ['userid','regdt'])
df=df.join(dfa, ["userid"],'left').join(dfb, ["userid"],'left')
+------+------+-----+---+-----+------+------+-------------------+
|userid|gender|level|vip|label|label1|salary| regdt|
+------+------+-----+---+-----+------+------+-------------------+
| 0002| M| M| 0| 0| 0| 4000|2018-11-01 10:10:01|
| 0003| F| L| 1| 0| 0| null|2018-09-01 10:10:01|
| 0001| F| H| 1| 0| 0| 2000|2018-10-01 10:10:01|
| 0006| F| H| 1| 0| 0| null|2019-01-01 10:10:01|
| 0004| F| H| 0| 0| 0| null|2018-10-18 10:10:01|
| 0005| M| M| 1| 0| 0| 3000|2018-10-19 10:10:01|
+------+------+-----+---+-----+------+------+-------------------+
import pyspark.sql.functions as F
from pyspark.sql import Window
# row_number()函数
df.withColumn("row_number", F.row_number().over(Window.partitionBy('gender').orderBy('regdt'))).show()
+------+------+-----+---+-----+------+------+-------------------+----------+
|userid|gender|level|vip|label|label1|salary| regdt|row_number|
+------+------+-----+---+-----+------+------+-------------------+----------+
| 0003| F| L| 1| 0| 0| null|2018-09-01 10:10:01| 1|
| 0001| F| H| 1| 0| 0| 2000|2018-10-01 10:10:01| 2|
| 0004| F| H| 0| 0| 0| null|2018-10-18 10:10:01| 3|
| 0006| F| H| 1| 0| 0| null|2019-01-01 10:10:01| 4|
| 0005| M| M| 1| 0| 0| 3000|2018-10-19 10:10:01| 1|
| 0002| M| M| 0| 0| 0| 4000|2018-11-01 10:10:01| 2|
+------+------+-----+---+-----+------+------+-------------------+----------+
df.select('userid').subtract(dfa.select('userid')).show()
+------+
|userid|
+------+
| 0003|
| 0006|
| 0004|
+------+
(未完待续。。)
一直以来都有记录的习惯,倒不是为了没事抒个情,而是学习中很多细节的东西容易记得不牢。简书和网易云笔记都是平时用来做经验总结和学习笔记的工具,也挺好用的,至于要在github上做个人网站的原因,一是可以定制独属于自己网站,这感觉很好haha~ ,二是Github总不至于像当年的人人网一样没了,可以放心把自己学习到的经验技巧放在这里,最后就是好奇别人的github网站怎么做的,于是自己也折腾一下。
这个网站的完工要感谢Gaohaoyang和cyzus两位前辈在github上提供的模版。