admin管理员组文章数量:1794759
pyspark基础学习——数据处理
目录
- 前言
- 一、准备工作和数据的导入选择
- 1.1 导入数据
- 1.2 选择数据子集:
- 1.3 列名重命名
- 二、数据清洗
- 2.1 检测空值数量
- 2.2 删除存在空值的行
- 2.3 forward,backward填充
- 三、 数据处理
- 3.1 数据筛选
- 3.2 数据统计
- 3.3 数据类型转换
- 3.4 采用SQL语法进行处理
- 四、数据导出
- 总结
前言
上一篇文章中讲了如何在windows下安装和检测: pyspark,同时简单介绍了运行的环境。本文想就我的一些学习经验,分享一下使用pyspark来处理csv文件上的一些常用的pyspark语法。
一、准备工作和数据的导入选择运行python代码,第一件事当然是导入对应的包,同时我们要为spark先创建好相应的环境,并且,spark中支持SQL,而且在SQL中有众多的函数,因此我们可以创建SparkSession对象,为了后续SQL函数的调用,我们要导入functions包,以及数据类型转换的时候,我们要导入types的包。
import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql.functions import * from pyspark.sql.types import TimestampType spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext 1.1 导入数据将csv文件导入为Dataframe样式: header表示是否需要导入表头;inferSchema表示是否需要推导出数据的类型(false默认为string);delimiter表示指定分隔符进行读取。file对应文件的位置。
df1 = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(file) 1.2 选择数据子集:drop中填入不需要的列的列名。
df2 = df1.drop('列名') 1.3 列名重命名 df3=df2.withColumnRenamed("original name", "modified name")如果有多个列的列名要进行修改,可以直接在后面再加上withColumnRenamed()进行修改
二、数据清洗因为数据本身的问题,在处理的过程中需要我们对一些空值、异常值等进行处理。但是此次作业获取到的数据中主要是对空值的处理,因此对于异常值的处理不进行讨论
2.1 检测空值数量 df3.toPandas().isnull().sum() 2.2 删除存在空值的行对于一些关键列的数据丢失、或是该行的缺失值占比较高的情况下,我们很难将人工将其弥补,因此直接对该行进行删除。
df_clear=df3.dropna(subset='列名') 2.3 forward,backward填充forward: 前面一个值填充后面 backward:后面一个值填充前面
代码示例:
df = spark.createDataFrame([ (1, 'd1',None), (1, 'd2',10), (1, 'd3',None), (1, 'd4',30), (1, 'd5',None), (1, 'd6',None), ],('id', 'day','temperature')) df.show()运行结果如下:
1 | d1 | null |
1 | d2 | 10 |
1 | d3 | null |
1 | d4 | 30 |
1 | d5 | null |
1 | d6 | null |
填充后的结果如下表所示:
1 | d1 | null | null | 10 |
1 | d2 | 10 | 10 | 10 |
1 | d3 | null | 10 | 30 |
1 | d4 | 30 | 30 | 30 |
1 | d5 | null | 30 | null |
1 | d6 | null | 30 | null |
Window.unboundedPreceding:分区的开始位置 Window.currentRow:分区计算到现在的位置 Window.unboundedFollowing:分区的最后位置。 负数:表示若前面有元素,范围向前延申几个元素 0:表示当前位置,等价于Window.currentRow 正数:表示若后面有元素,范围向后延申几个元素
三、 数据处理 3.1 数据筛选 data1= df_clear.filter(df_clear['column'] == 'attribute') # 条件过滤 data2 = df_clear.select('column') # 选择某一列的数据 3.2 数据统计 # 输出树状结构(输出列名、数据类型和是否能为空值) df_clear.printSchema() # 将该列数据进行汇总统计 df_clear.select('column').describe().show() # 求平均,按照id的方式进行统计 ave_column = df_clear.groupBy('id').agg({'column': 'mean'})agg({“列名”,“函数名”})为聚合函数,其中有:
avg | 求均值 |
count | 计数 |
max | 求最大值 |
mean | 求均值 |
min | 求最小值 |
sum | 求和 |
总结
由于此次学习仅用于完成课堂大作业,因此有不足之处还望各位大佬在评论区制指正,若是能够为你们提供一点小小的帮助,希望各位大佬们能动动手指,给小弟一个赞!感谢各位大佬们! 该作业的处理的源代码和相关数据已经传至github
版权声明:本文标题:pyspark基础学习——数据处理 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1687054848a130636.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论