操作DataFrame

在PySpark 2.0中,DataFrame为结构化数据操作提供了一种特定于领域的语言(Domain-Specific Language,DSL)。这些操作被分为两类,Transformation和Action。开发人员链接多个操作来选择、过滤、转换、聚合和排序在DataFrame中的数据。底层的Catalyst优化器确保了这些操作的高效执行。

列的多种引用方式

在学习操作DataFrame之前,需要掌握PySpark所提供的引用DataFrame列的多种方式。首先创建一个DataFrame,代码如下:

# 将元组转为DataFrame                
kvDF = spark.createDataFrame([(1,2),(3,4)],["key","value"])

kvDF.printSchema()
kvDF.show()

执行以上代码,输出内容如下:

root
 |-- key: integer (nullable = false)
 |-- value: integer (nullable = false)

+---+-----+
|key|value|
+---+-----+
|  1|     2|
|  3|     4|
+---+-----+

要显示一个DataFrame的所有列名,可以调用其columns属性,代码如下:

kvDF.columns			# 输出:['key', 'value']

如果仅仅是为了获取列值,而不对列值做任何计算和比较,则直接引用列名字符串即可,代码如下:

kvDF.select("key").show()             	# 列为字符串类型

执行以上代码,输出内容如下:

+---+
|key|
+---+
|  1|
|  3|
+---+

如果要对列做任何计算和比较等操作,则需要获得列的对象类型,这需要使用内置的col函数来选择列(这时获得的是一个pyspark.sql.Column对象),代码如下:

from pyspark.sql.functions import col

kvDF.select(col("key")).show()            	# 列为Column类型
kvDF.where(col("key") > 1).show()         	# 列为Column类型

执行以上代码,输出内容如下:

+---+
|key|
+---+
|  1|
|  3|
+---+

+---+-----+
|key|value|
+---+-----+
|  3|     4|
+---+-----+

也可以使用expr()函数,它与col()函数相同,返回一个pyspark.sql.Column对象,不过它允许使用表达式作为参数,代码如下:

from pyspark.sql.functions import expr

kvDF.select(expr("key")).show()   
kvDF.select(expr("key > 1")).show()  
kvDF.select(expr("key") > 1).show()  

执行以上代码,输出内容如下:

+---+
|key|
+---+
|  1|
|  3|
+---+

+---------+
|(key > 1)|
+---------+
|     false|
|      true|
+---------+

+---------+
|(key > 1)|
+---------+
|     false|
|      true|
+---------+

也可以直接通过DataFrame来引用,代码如下:

kvDF.select(kvDF.key).show()
kvDF.select(kvDF["key"]).show()

执行以上代码,输出内容如下:

+---+
|key|
+---+
|  1|
|  3|
+---+

+---+
|key|
+---+
|  1|
|  3|
+---+

下面的代码选择key列,并增加一个新的列,新列的值由key列计算得来,为boolean值,表示key列的值是否大于1,代码如下:

from pyspark.sql.functions import col

kvDF.select(col("key"), col("key") > 1).show()

执行以上代码,输出内容如下:

+---+---------+
|key|(key > 1)|
+---+---------+
|  1|     false|
|  3|      true|
+---+---------+

也可以给列取一个别名,代码如下:

kvDF.select(col("key"), (col("key") > 1).alias("a")).show()
# 或
kvDF.select(kvDF.key, (kvDF.key > 1).alias("a")).show()

执行以上代码,输出内容如下:

+---+------+
|key|     a|
+---+------+
|  1| false|
|  3|  true|
+---+------+

也可以使用正则表达式来选择列,代码如下:

kvDF.select(kvDF.colRegex("`^k.*`")).show()

执行以上代码,输出内容如下:

+---+
|key|
+---+
|  1|
|  3|
+---+

对于列的引用方式,区别如下:

  • (1) 如果只是为了获取特定列的值,则直接以字符串类型引用列名即可。
  • (2) 如果引用列是为了任何形式的计算,包括排序、类型转换、别名、比较、计算列等等,则需要应用上述任一函数将列转换为Column对象。

对DataFrame执行Transformation转换操作

DataFrame API提供有许多函数用来执行关系运算,这些函数模拟了SQL关系操作:

  • 选择数据:select()。
  • 删除某列:drop()。
  • 过滤数据:where()和filter()(两者等价)。
  • 限制返回的数量:limit()。
  • 重命名列:withColumnRenamed()。
  • 增加一个新的列:withColumn()。
  • 数据分组:groupBy()。
  • 数据排序:orderBy()和sort()(两者等价)。

在进一步演示DataFrame的各种操作方法之前,先准备好要用到的数据。这里将使用一个电影数据集movies.csv,请将它上传到HDFS的/data/spark/目录下。其中部分数据格式如下所示:

actor,title,year
"McClure, Marc (I)",Freaky Friday,2003
"McClure, Marc (I)",Coach Carter,2005
"McClure, Marc (I)",Superman II,1980
"McClure, Marc (I)",Apollo 13,1995
"McClure, Marc (I)",Superman,1978
"McClure, Marc (I)",Back to the Future,1985
"McClure, Marc (I)",Back to the Future Part III,1990
"Cooper, Chris (I)","Me, Myself & Irene",2000
"Cooper, Chris (I)",October Sky,1999
"Cooper, Chris (I)",Capote,2005
"Cooper, Chris (I)",The Bourne Supremacy,2004

可以看到,数据集有三个字段:actor,title和year。数据集第一行是标题行。

首先,加载数据集到DataFrame中,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 加载电影数据集文件到DataFrame中
file = "/data/spark/movies2/movies.csv"
movies = spark.read \
      .option("header","true") \
      .option("inferSchema","true") \
      .csv(file)

movies.printSchema()
movies.show(5)

执行以上代码,输出内容如下:

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+-----------------+-------------+----+
|              actor|          title|year|
+-----------------+-------------+----+
|McClure, Marc (I)|Freaky Friday|2003|
|McClure, Marc (I)| Coach Carter|2005|
|McClure, Marc (I)|  Superman II|1980|
|McClure, Marc (I)|     Apollo 13|1995|
|McClure, Marc (I)|      Superman|1978|
+-----------------+-------------+----+
only showing top 5 rows

1)select()函数

类似于SQL中的select子句,用来选择指定的列。例如,选取movies中的title和year两列,并返回一个新的DataFrame,代码如下:

from pyspark.sql.functions import *

movies.select("title","year").show(5)

执行以上代码,输出内容如下:

+-------------+----+
|         title|year|
+-------------+----+
|Freaky Friday|2003|
| Coach Carter|2005|
|  Superman II|1980|
|     Apollo 13|1995|
|      Superman|1978|
+-------------+----+
only showing top 5 rows

将电影上演的年份转换到年代表示,使用col()函数,并赋予一个别名,代码如下:

from pyspark.sql.functions import *

# 使用将年份列转换到年代列
movies2 = movies.select(col('title') ,(col('year') - col('year') % 10).alias("decade"))
movies2.show(5)

执行以上代码,输出内容如下:

+-------------+------+
|          title|decade|
+-------------+------+
|Freaky Friday|  2000|
| Coach Carter|  2000|
|  Superman II|  1980|
|     Apollo 13|  1990|
|      Superman|  1970|
+-------------+------+
only showing top 5 rows

2)selectExp()函数

用来选择一组SQL表达式,即使用SQL表达式来选择列。例如,在下面的代码中,用通配符星号(*)来表示选择所有的列,并增加一个新的列decade,新列的值是通过对year列值计算得到的电影上映的年代,代码如下:

from pyspark.sql.functions import *

movies.selectExpr("*","(year - year % 10) as decade").show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+----+------+
|              actor|          title|year|decade|
+-----------------+-------------+----+------+
|McClure, Marc (I)|Freaky Friday|2003|  2000|
|McClure, Marc (I)| Coach Carter|2005|  2000|
|McClure, Marc (I)|  Superman II|1980|  1980|
|McClure, Marc (I)|     Apollo 13|1995|  1990|
|McClure, Marc (I)|      Superman|1978|  1970|
+-----------------+-------------+----+------+
only showing top 5 rows

在selectExpr()方法中,不但支持使用SQL表达式,还支持直接使用SQL内置函数。例如,使用SQL表达式和内置函数,来查询电影数量和演员数量这两个值,代码如下:

from pyspark.sql.functions import *

movies.selectExpr("count(distinct(title)) as movies", "count(distinct(actor)) as actors").show()

执行以上代码,输出内容如下:

+------+------+
|movies|actors|
+------+------+
|  1409|   6527|
+------+------+

可以看出,数据集中的电影共有1409部,演员共有6527名。

3)filter(),where()函数

使用给定的条件过滤DataFrame中的行。这两个函数是等价的,相当于SQL中的where子句。例如,要找出2000年以前上映的电影,需要在filter()函数或where()函数中指定过滤条件,代码如下:

from pyspark.sql.functions import *

movies.filter('year < 2000').show(5)
# movies.where('year < 2000').show(5)   # 等价

执行以上代码,输出内容如下:

+-----------------+--------------------+----+
|              actor|                  title|year|
+-----------------+--------------------+----+
|McClure, Marc (I)|           Superman II|1980|
|McClure, Marc (I)|             Apollo 13|1995|
|McClure, Marc (I)|              Superman|1978|
|McClure, Marc (I)|  Back to the Future|1985|
|McClure, Marc (I)|Back to the Futur...|1990|
+-----------------+--------------------+----+
only showing top 5 rows

如果想找出2000年及以后上映的电影,代码如下:

movies.filter('year >= 2000').show(5)
# movies.where('year >= 2000').show(5)  # 等价

执行以上代码,输出内容如下:

+-----------------+--------------------+----+
|              actor|                  title|year|
+-----------------+--------------------+----+
|McClure, Marc (I)|        Freaky Friday|2003|
|McClure, Marc (I)|         Coach Carter|2005|
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|                 Capote|2005|
|Cooper, Chris (I)|The Bourne Supremacy|2004|
+-----------------+--------------------+----+
only showing top 5 rows

如果要找出2000年上映的电影,代码如下:

# 相等比较
movies.filter('year = 2000').show(5)
# movies.where('year = 2000').show(5)  # 等价

执行以上代码,输出内容如下:

+-----------------+--------------------+----+
|              actor|                  title|year|
+-----------------+--------------------+----+
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|          The Patriot|2000|
|  Jolie, Angelina|Gone in Sixty Sec...|2000|
|   Yip, Françoise|       Romeo Must Die|2000|
|   Danner, Blythe|     Meet the Parents|2000|
+-----------------+--------------------+----+
only showing top 5 rows

不等比较使用的操作符是!=。找出非2000年上映的电影,代码如下:

# 不等比较使用的操作符是 !=
movies.select("title","year").filter('year != 2000').show(5)
# movies.select("title","year").where('year != 2000').show(5)  # 等价
# movies.select("title","year").filter(col('year') != 2000).show(5)  # 等价

执行以上代码,输出内容如下:

+-------------+----+
|         title|year|
+-------------+----+
|Freaky Friday|2003|
| Coach Carter|2005|
|  Superman II|1980|
|    Apollo 13|1995|
|     Superman|1978|
+-------------+----+
only showing top 5 rows

支持离散值匹配。例如,找出2001年到2002年间上映的电影,代码如下:

movies.filter(col("year").isin([2001,2002])).show(5, truncate=False) 

# 等价
# movies.where(col("year").isin([2001,2002])).show(5, truncate=False)  

执行以上代码,输出内容如下:

+-------------------+--------------------------------------+----+
|actor                 |title                                       |year|
+-------------------+--------------------------------------+----+
|Cooper, Chris (I)  |The Bourne Identity                       |2002|
|Cassavetes, Frank  |John Q                                       |2002|
|Knight, Shirley (I)|Divine Secrets of the Ya-Ya Sisterhood|2002|
|Jolie, Angelina     |Lara Croft: Tomb Raider                  |2001|
|Cueto, Esteban      |Collateral Damage                         |2002|
+-------------------+--------------------------------------+----+
only showing top 5 rows

可使用OR和AND运算符组合一个或多个比较表达式。例如,要找出2000年及以后上映、并且名称长度少于5个字符的电影,代码如下:

movies.filter('year >= 2000' and length('title') < 5).show(5)  
# movies.where('year >= 2000' and length('title') < 5).show(5)   # 等价

执行以上代码,输出内容如下:

+---------------+-----+----+
|            actor|title|year|
+---------------+-----+----+
|Jolie, Angelina| Salt|2010|
| Cueto, Esteban|  xXx|2002|
|  Butters, Mike|  Saw|2004|
| Franko, Victor|   21|2008|
|  Ogbonna, Chuk| Salt|2010|
+---------------+-----+----+
only showing top 5 rows

另一种实现相同结果的方法是调用filter函数两次,代码如下:

movies.filter('year >= 2000').filter(length('title') < 5).show(5)
# movies.where('year >= 2000').where(length('title') < 5).show(5)   # 等价

4)distinct(),dropDuplicates()函数

返回一个新数据集,其中仅包含此数据集中的唯一行。这是dropDuplicates()的别名。例如,想知道数据集中共有多少条唯一行(去重),代码如下:

from pyspark.sql.functions import *

movies.distinct().count()
movies.dropDuplicates().count()    # 与上句等价

执行以上代码,输出内容如下:

31394

但是,如果想要知道数据集中共包含有多少部电影,则需要基于title字段进行唯一值统计,代码如下:

movies.select("title").distinct().count()

# 其实也可以使用SQL的distinct函数
# movies.selectExpr("count(distinct(title)) as movies").show()

执行以上代码,输出内容如下:

1409

5)dropDuplicates()函数

仅考虑列的子集,返回删除(按列的子集)重复行的新数据集。例如,同样需要统计数据集中共包含多少部电影,代码如下:

movies.dropDuplicates(["title"]).count()

执行以上代码,输出内容如下:

1409

6)sort(),orderBy()函数

相当于SQL中的order by子句,它返回按指定的列排序后的新数据集。例如,要按电影名称长度顺序以及上映年份倒序显示,代码如下:

from pyspark.sql.functions import *

movies.dropDuplicates(["title", "year"]) \
      .selectExpr("title", "length(title) as title_length", "year") \
      .orderBy(col("title_length").asc(), col("year").desc()) \
      .show(10,False)

orderBy()函数与sort()函数是等价的,所以上面代码中的orderBy()也可以换为sort(),结果是相同的。执行以上代码,输出内容如下:

+-----+------------+----+
|title|title_length|year|
+-----+------------+----+
|Up    |2             |2009|
|21    |2             |2008|
|12    |2             |2007|
|RV    |2             |2006|
|X2    |2             |2003|
|Rio   |3             |2011|
|Hop   |3             |2011|
|300   |3             |2006|
|Saw   |3             |2004|
|Elf   |3             |2003|
+-----+------------+----+
only showing top 10 rows

7)groupBy()

相当于SQL中的group by子句,按指定的列对数据进行分组,以便执行聚合统计操作。例如,统计每年上映的电影数量并按数量倒序显示,代码如下:

from pyspark.sql.functions import *

movies.groupBy("year").count().orderBy(col("count").desc()).show(10)

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|
|2007| 1986|
|2005| 1960|
|2011| 1926|
|2008| 1892|
|2009| 1890|
|2010| 1843|
|2002| 1834|
|2001| 1687|
+----+-----+
only showing top 10 rows

如果要统计上映电影数量超过2000部的年份,代码如下:

movies.groupBy("year").count().where(col("count") > 2000).show()

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|

8)limit()

通过获取前n行返回一个新的DataFrame,相当于SQL中的limit子句,限制返回的行数。通常将其与orderBy配合来实现Top N算法。例如,统计上映电影数量最多的5个年份,代码如下:

from pyspark.sql.functions import *

movies.groupBy("year") \
       .count() \
       .orderBy(col("count").desc()) \
       .limit(5) \
       .show()

执行以上代码,输出内容如下:

+----+-----+
|year|count|
+----+-----+
|2006| 2078|
|2004| 2005|
|2007| 1986|
|2005| 1960|
|2011| 1926|
+----+-----+

查询电影名称最长的5部电影,代码如下:

movies.dropDuplicates(["title"]) \
       .selectExpr("title", "length(title) as title_length") \
       .orderBy(col("title_length").desc()) \
       .limit(5) \
       .show(truncate=False)

执行以上代码,输出内容如下:

+-----------------------------------------------------------------------------------+------------+
|title                                                                              |title_length|
+-----------------------------------------------------------------------------------+------------+
|Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|83          |
|The Chronicles of Narnia: The Lion, the Witch and the Wardrobe                     |62          |
|Hannah Montana & Miley Cyrus: Best of Both Worlds Concert                          |57          |
|The Chronicles of Narnia: The Voyage of the Dawn Treader                           |56          |
|Istoriya pro Richarda, milorda i prekrasnuyu Zhar-ptitsu                           |56          |
+-----------------------------------------------------------------------------------+------------+

9)withColumn()

通过添加列或替换具有相同名称的现有列,返回一个新的数据集。例如,向movies数据集增加一个新列decade,该列的值是基于year这个列的表达式,计算的结果是该电影上映的年代,代码如下:

from pyspark.sql.functions import *

movies.withColumn("decade", (col('year') - col('year') % 10)).show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+----+------+
|              actor|          title|year|decade|
+-----------------+-------------+----+------+
|McClure, Marc (I)|Freaky Friday|2003|  2000|
|McClure, Marc (I)| Coach Carter|2005|  2000|
|McClure, Marc (I)|  Superman II|1980|  1980|
|McClure, Marc (I)|     Apollo 13|1995|  1990|
|McClure, Marc (I)|      Superman|1978|  1970|
+-----------------+-------------+----+------+
only showing top 5 rows

如果传给withColumn()函数的列名与现有列名相同,则意味着用新值替换旧值。例如,替换year列的值为年代(原值为年份),代码如下:

movies.withColumn("decade", (col('year') - col('year') % 10)).show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+----+
|              actor|          title|year|
+-----------------+-------------+----+
|McClure, Marc (I)|Freaky Friday|2000|
|McClure, Marc (I)| Coach Carter|2000|
|McClure, Marc (I)|  Superman II|1980|
|McClure, Marc (I)|     Apollo 13|1990|
|McClure, Marc (I)|      Superman|1970|
+-----------------+-------------+----+
only showing top 5 rows

10)withColumnRenamed()

返回一个重命名列的新数据集。如果模式不包含指定的列名,则不做任何操作。例如,将movies数据集中的列名改为新的名称,代码如下:

from pyspark.sql.functions import *

movies.withColumnRenamed("actor", "actor_name") \
       .withColumnRenamed("title", "movie_title") \
       .withColumnRenamed("year", "produced_year") \
       .show(5)

执行以上代码,输出内容如下:

+-----------------+-------------+-------------+
|        actor_name|   movie_title|produced_year|
+-----------------+-------------+-------------+
|McClure, Marc (I)| Freaky Friday|         2003|
|McClure, Marc (I)|   Coach Carter|         2005|
|McClure, Marc (I)|    Superman II|         1980|
|McClure, Marc (I)|      Apollo 13|         1995|
|McClure, Marc (I)|        Superman|         1978|
+-----------------+-------------+-------------+
only showing top 5 rows

对DataFrame执行Action操作

与RDD类似,DataFrame上的Transformation转换操作都是延迟执行的。只有当在DataFrame上执行Action操作时,才会触发真正的计算。这些Action操作相对都比较简单,关于这些Action操作方法的使用,代码如下:

from pyspark.sql.functions import *

# 查看前5条数据。第2个参数指定当列内容较长时,是否截断显示,false为不截断
movies.show(5,truncate=False)

# 返回数据集中的数量
movies.count

# 返回数据集中第1条数据
movies.first()

# 等价于first方法
movies.head()

# 返回数据集中前3条数据,以Array形式
movies.head(3)

# 返回数据集中前3条数据,以Array形式
movies.take(3)

# 返回一个包含数据集中所有行的数组
movies.collect()

# 返回数据集的列名,以列表形式
movies.columns  		# ['actor', 'title', 'year']

对DataFrame执行描述性统计操作

PySpark还为DataFrame提供了一个describe函数,用来计算数字列和字符串列的基本统计信息,包括count、mean、stddev、min和max。如果没有给定列,则此函数计算所有数值列或字符串列的统计信息。

这个方法是一个Action操作,经常用于对数据集执行探索性数据分析,代码如下:

descDF = movies.describe()

descDF.printSchema()
descDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)

+-------+------------------+--------------------+------------------+
|summary|               actor|                   title|                year|
+-------+------------------+--------------------+------------------+
|  count|                31394|                  31394|                31393|
|   mean|                 null|   312.61538461538464|2002.7964514382188|
| stddev|                 null|    485.7043414390151| 6.377135379933117|
|    min|     Aaron, Caroline|'Crocodile' Dunde...|                 1961|
|    max| von Sydow, Max (I)|                    xXx|                 2012|
+-------+------------------+--------------------+------------------+

要计算year字段的基本统计信息,代码如下:

descDF = movies.describe("year")

descDF.printSchema()
descDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- year: string (nullable = true)

+-------+------------------+
|summary|                 year|
+-------+------------------+
|  count|                31393|
|   mean|2002.7964514382188|
| stddev| 6.377135379933117|
|    min|                 1961|
|    max|                 2012|
+-------+------------------+

如果要计算year和actor这两列的基本统计信息,代码如下:

descDF = movies.describe("year","actor")

descDF.printSchema()
descDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- year: string (nullable = true)
 |-- actor: string (nullable = true)

+-------+------------------+------------------+
|summary|                 year|               actor|
+-------+------------------+------------------+
|  count|                31393|               31394|
|   mean|2002.7964514382188|                 null|
| stddev| 6.377135379933117|                null|
|    min|                  1961|   Aaron, Caroline|
|    max|                  2012|von Sydow, Max (I)|
+-------+------------------+------------------+

PySpark还提供了一个与describe()函数类似的summary()函数,用于提供数据集的摘要信息。如果没有给出统计信息,这个函数将计算count、mean、stddev、min、近似四分位数(25%、50%和75%的百分位数)和max。例如,对movies调用summary()函数,代码如下:

summaryDF = movies.summary()
    
summaryDF.printSchema()
summaryDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)

+-------+------------------+--------------------+------------------+
|summary|               actor|                   title|                year|
+-------+------------------+--------------------+------------------+
|  count|                31394|                  31394|                31393|
|   mean|                 null|   312.61538461538464|2002.7964514382188|
| stddev|                 null|    485.7043414390151| 6.377135379933117|
|    min|     Aaron, Caroline|'Crocodile' Dunde...|                 1961|
|    25%|                  null|                   21.0|                 1999|
|    50%|                  null|                   21.0|                 2004|
|    75%|                  null|                  300.0|                 2008|
|    max| von Sydow, Max (I)|                    xXx|                 2012|
+-------+------------------+--------------------+------------------+

也可以指定想要的统计信息,代码如下:

summaryDF = movies.summary("count", "min", "25%", "75%", "max")
    
summaryDF.printSchema()
summaryDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)

+-------+------------------+--------------------+-----+
|summary|             actor|               title| year|
+-------+------------------+--------------------+-----+
|  count|             31394|               31394|31393|
|    min|   Aaron, Caroline|'Crocodile' Dunde...| 1961|
|    25%|              null|                21.0| 1999|
|    75%|              null|               300.0| 2008|
|    max|von Sydow, Max (I)|                 xXx| 2012|
+-------+------------------+--------------------+-----+

要对指定的列做一个摘要,首先选择这些列,然后再执行summary()方法,代码如下:

summaryDF = movies.select("year").summary()
    
summaryDF.printSchema()
summaryDF.show()

执行以上代码,输出内容如下:

root
 |-- summary: string (nullable = true)
 |-- year: string (nullable = true)

+-------+------------------+
|summary|                 year|
+-------+------------------+
|  count|                31393|
|   mean|2002.7964514382188|
| stddev| 6.377135379933117|
|     min|                1961|
|     25%|                1999|
|     50%|                2004|
|     75%|                2008|
|     max|                2012|
+-------+------------------+

提取DataFrame Row中特定字段

有时候,用户需要从DataFrame中获取特定行的特定字段的值,这时可以通过索引的方式。

在下面的示例中,要求加载PySpark自带的数据文件people.json,并输出每个字段的值。首先读取people.json到一个DataFrame中,代码如下:

# 加载数据源,构造DataFrame
input = "file:///home/hduser/data/spark/resources/people.json"
df = spark.read.json(input)

df.printSchema()
df.show()

执行以上代码,输出内容如下:

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|    Andy|
|  19| Justin|
+----+-------+

DataFrame返回的每一行是一个Row对象,可以按字段的位置取每个字段的值,代码如下:

# 取每个字段的值
for row in df.collect():
    print(row[0],"\t",row[1])

执行以上代码,输出内容如下:

Michael 0
Andy    30
Justin  19

《Flink原理深入与编程实战》